diff --git a/src/backend/BackendFactory.h b/src/backend/BackendFactory.h index 0ba526ef..1cc7d106 100644 --- a/src/backend/BackendFactory.h +++ b/src/backend/BackendFactory.h @@ -40,6 +40,12 @@ make_Backend(boost::json::object const& config) throw std::runtime_error("Invalid database type"); backend->open(readOnly); + auto rng = backend->hardFetchLedgerRangeNoThrow(); + if (rng) + { + backend->updateRange(rng->minSequence); + backend->updateRange(rng->maxSequence); + } backend->checkFlagLedgers(); BOOST_LOG_TRIVIAL(info) diff --git a/src/backend/BackendIndexer.cpp b/src/backend/BackendIndexer.cpp index 4aa17fa9..84ae7808 100644 --- a/src/backend/BackendIndexer.cpp +++ b/src/backend/BackendIndexer.cpp @@ -27,7 +27,7 @@ BackendIndexer::doKeysRepair( BackendInterface const& backend, std::optional sequence) { - auto rng = backend.fetchLedgerRangeNoThrow(); + auto rng = backend.fetchLedgerRange(); if (!rng) return; @@ -209,11 +209,10 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) BOOST_LOG_TRIVIAL(debug) << __func__ << " starting. sequence = " << std::to_string(ledgerSequence); - bool isFirst = false; auto keyIndex = getKeyIndexOfSeq(ledgerSequence); if (isFirst_) { - auto rng = backend.fetchLedgerRangeNoThrow(); + auto rng = backend.fetchLedgerRange(); if (rng && rng->minSequence != ledgerSequence) isFirst_ = false; else diff --git a/src/backend/BackendInterface.cpp b/src/backend/BackendInterface.cpp index f7bc29eb..dd1d11c3 100644 --- a/src/backend/BackendInterface.cpp +++ b/src/backend/BackendInterface.cpp @@ -55,14 +55,14 @@ BackendInterface::writeLedgerObject( std::move(book)); } std::optional -BackendInterface::fetchLedgerRangeNoThrow() const +BackendInterface::hardFetchLedgerRangeNoThrow() const { BOOST_LOG_TRIVIAL(warning) << __func__; while (true) { try { - return fetchLedgerRange(); + return hardFetchLedgerRange(); } catch (DatabaseTimeout& t) { @@ -205,6 +205,7 @@ BackendInterface::fetchLedgerPage( uint32_t adjustedLimit = std::max(limitHint, std::max(limit, (uint32_t)4)); LedgerPage page; page.cursor = cursor; + int numCalls = 0; do { adjustedLimit = adjustedLimit >= 8192 ? 8192 : adjustedLimit * 2; @@ -223,8 +224,7 @@ BackendInterface::fetchLedgerPage( page.objects.insert( page.objects.end(), partial.objects.begin(), partial.objects.end()); page.cursor = partial.cursor; - } while (page.objects.size() < limit && page.cursor); - + } while (page.objects.size() < limit && page.cursor && ++numCalls < 10); if (incomplete) { auto rng = fetchLedgerRange(); @@ -278,7 +278,7 @@ BackendInterface::fetchLedgerPage( void BackendInterface::checkFlagLedgers() const { - auto rng = fetchLedgerRangeNoThrow(); + auto rng = hardFetchLedgerRangeNoThrow(); if (rng) { bool prevComplete = true; diff --git a/src/backend/BackendInterface.h b/src/backend/BackendInterface.h index edf0f217..ccf3c0fa 100644 --- a/src/backend/BackendInterface.h +++ b/src/backend/BackendInterface.h @@ -66,6 +66,7 @@ class BackendInterface protected: mutable BackendIndexer indexer_; mutable bool isFirst_ = true; + mutable std::optional range; public: BackendInterface(boost::json::object const& config) : indexer_(config) @@ -97,16 +98,15 @@ public: virtual std::optional fetchLatestLedgerSequence() const = 0; - virtual std::optional - fetchLedgerRange() const = 0; + std::optional + fetchLedgerRange() const + { + return range; + } std::optional fetchFees(std::uint32_t seq) const; - // Doesn't throw DatabaseTimeout. Should be used with care. - std::optional - fetchLedgerRangeNoThrow() const; - // *** transaction methods virtual std::optional fetchTransaction(ripple::uint256 const& hash) const = 0; @@ -174,6 +174,20 @@ protected: friend std::shared_ptr make_Backend(boost::json::object const& config); friend class ::BackendTest_Basic_Test; + virtual std::optional + hardFetchLedgerRange() const = 0; + // Doesn't throw DatabaseTimeout. Should be used with care. + std::optional + hardFetchLedgerRangeNoThrow() const; + + void + updateRange(uint32_t newMax) + { + if (!range) + range = {newMax, newMax}; + else + range->maxSequence = newMax; + } virtual void writeLedger( diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index 05e7e463..ea0f2b4f 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -2,24 +2,12 @@ #include #include #include -/* -namespace std { -template <> -struct hash -{ - std::size_t - operator()(const ripple::uint256& k) const noexcept - { - return boost::hash_range(k.begin(), k.end()); - } -}; -} // namespace std -*/ namespace Backend { template void processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func) { + BOOST_LOG_TRIVIAL(debug) << __func__ << " Processing async write response"; CassandraBackend const& backend = *requestParams.backend; auto rc = cass_future_error_code(fut); if (rc != CASS_OK) @@ -45,180 +33,233 @@ processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func) { BOOST_LOG_TRIVIAL(trace) << __func__ << " Succesfully inserted a record"; - backend.finishAsyncWrite(); - int remaining = --requestParams.refs; - if (remaining == 0) - delete &requestParams; + requestParams.finish(); } } -// Process the result of an asynchronous write. Retry on error -// @param fut cassandra future associated with the write -// @param cbData struct that holds the request parameters +template void -flatMapWriteCallback(CassFuture* fut, void* cbData) +processAsyncWrite(CassFuture* fut, void* cbData) { - CassandraBackend::WriteCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->write(params, retry); - }; - - processAsyncWriteResponse(requestParams, fut, func); + T& requestParams = *static_cast(cbData); + // TODO don't pass in func + processAsyncWriteResponse(requestParams, fut, requestParams.retry); } -/* - -void -retryWriteKey(CassandraBackend::WriteCallbackData& requestParams, bool isRetry) +template +struct WriteCallbackData { - auto const& backend = *requestParams.backend; - if (requestParams.isDeleted) - backend.writeDeletedKey(requestParams, true); - else - backend.writeKey(requestParams, true); -} + CassandraBackend const* backend; + T data; + std::function&, bool)> retry; + uint32_t currentRetries; + std::atomic refs = 1; -void -flatMapWriteKeyCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteCallbackData& requestParams = - *static_cast(cbData); - processAsyncWriteResponse(requestParams, fut, retryWriteKey); -} - -void -flatMapGetCreatedCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteCallbackData& requestParams = - *static_cast(cbData); - CassandraBackend const& backend = *requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - BOOST_LOG_TRIVIAL(info) << __func__; + WriteCallbackData(CassandraBackend const* b, T&& d, B bind) + : backend(b), data(std::move(d)) { - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert error: " << rc << ", " - << cass_error_desc(rc) << ", retrying "; - // exponential backoff with a max wait of 2^10 ms (about 1 second) - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.ioContext_, std::chrono::steady_clock::now() + wait); - timer->async_wait([timer, &requestParams, &backend]( - const boost::system::error_code& error) { - backend.writeKey(requestParams, true); - }); - } - else - { - auto finish = [&backend]() { - --(backend.numRequestsOutstanding_); - - backend.throttleCv_.notify_all(); - if (backend.numRequestsOutstanding_ == 0) - backend.syncCv_.notify_all(); + retry = [bind, this](auto& params, bool isRetry) { + auto statement = bind(params); + backend->executeAsyncWrite( + statement, + processAsyncWrite< + typename std::remove_reference::type>, + params, + isRetry); }; - CassandraResult result{cass_future_get_result(fut)}; + } + virtual void + start() + { + retry(*this, false); + } - if (!result) + virtual void + finish() + { + backend->finishAsyncWrite(); + int remaining = --refs; + if (remaining == 0) + delete this; + } + virtual ~WriteCallbackData() + { + } +}; +template +struct BulkWriteCallbackData : public WriteCallbackData +{ + std::mutex& mtx; + std::condition_variable& cv; + std::atomic_int& numRemaining; + BulkWriteCallbackData( + CassandraBackend const* b, + T&& d, + B bind, + std::atomic_int& r, + std::mutex& m, + std::condition_variable& c) + : WriteCallbackData(b, std::move(d), bind) + , numRemaining(r) + , mtx(m) + , cv(c) + { + } + void + start() override + { + this->retry(*this, true); + } + + void + finish() override + { + // TODO: it would be nice to avoid this lock. + std::lock_guard lck(mtx); + if (--numRemaining == 0) + cv.notify_one(); + } + ~BulkWriteCallbackData() + { + } +}; + +template +void +makeAndExecuteAsyncWrite(CassandraBackend const* b, T&& d, B bind) +{ + auto* cb = new WriteCallbackData(b, std::move(d), bind); + cb->start(); +} +template +std::shared_ptr> +makeAndExecuteBulkAsyncWrite( + CassandraBackend const* b, + T&& d, + B bind, + std::atomic_int& r, + std::mutex& m, + std::condition_variable& c) +{ + auto cb = std::make_shared>( + b, std::move(d), bind, r, m, c); + cb->start(); + return cb; +} +void +CassandraBackend::doWriteLedgerObject( + std::string&& key, + uint32_t seq, + std::string&& blob, + bool isCreated, + bool isDeleted, + std::optional&& book) const +{ + BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra"; + makeAndExecuteAsyncWrite( + this, + std::move(std::make_tuple(std::move(key), seq, std::move(blob))), + [this](auto& params) { + auto& [key, sequence, blob] = params.data; + + CassandraStatement statement{insertObject_}; + statement.bindBytes(key); + statement.bindInt(sequence); + statement.bindBytes(blob); + return statement; + }); +} +void +CassandraBackend::writeLedger( + ripple::LedgerInfo const& ledgerInfo, + std::string&& header, + bool isFirst) const +{ + makeAndExecuteAsyncWrite( + this, + std::move(std::make_tuple(ledgerInfo.seq, std::move(header))), + [this](auto& params) { + auto& [sequence, header] = params.data; + CassandraStatement statement{insertLedgerHeader_}; + statement.bindInt(sequence); + statement.bindBytes(header); + return statement; + }); + makeAndExecuteAsyncWrite( + this, + std::move(std::make_tuple(ledgerInfo.hash, ledgerInfo.seq)), + [this](auto& params) { + auto& [hash, sequence] = params.data; + CassandraStatement statement{insertLedgerHash_}; + statement.bindBytes(hash); + statement.bindInt(sequence); + return statement; + }); + ledgerSequence_ = ledgerInfo.seq; + isFirstLedger_ = isFirst; +} +void +CassandraBackend::writeAccountTransactions( + std::vector&& data) const +{ + for (auto& record : data) + { + for (auto& account : record.accounts) { - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc - << ", " << cass_error_desc(rc); - finish(); - return; + makeAndExecuteAsyncWrite( + this, + std::move(std::make_tuple( + std::move(account), + record.ledgerSequence, + record.transactionIndex, + record.txHash)), + [this](auto& params) { + CassandraStatement statement(insertAccountTx_); + auto& [account, lgrSeq, txnIdx, hash] = params.data; + statement.bindBytes(account); + uint32_t index = lgrSeq >> 20 << 20; + statement.bindUInt(index); + + statement.bindIntTuple(lgrSeq, txnIdx); + statement.bindBytes(hash); + return statement; + }); } - requestParams.createdSequence = result.getUInt32(); - backend.writeDeletedKey(requestParams, false); } } -*/ void -flatMapWriteTransactionCallback(CassFuture* fut, void* cbData) +CassandraBackend::writeTransaction( + std::string&& hash, + uint32_t seq, + std::string&& transaction, + std::string&& metadata) const { - CassandraBackend::WriteTransactionCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->writeTransaction(params, retry); - }; - processAsyncWriteResponse(requestParams, fut, func); -} -void -flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteAccountTxCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->writeAccountTx(params, retry); - }; - processAsyncWriteResponse(requestParams, fut, func); -} -void -flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteLedgerHeaderCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->writeLedgerHeader(params, retry); - }; - processAsyncWriteResponse(requestParams, fut, func); -} + BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra"; + std::string hashCpy = hash; -void -flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteLedgerHashCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->writeLedgerHash(params, retry); - }; - processAsyncWriteResponse(requestParams, fut, func); -} - -// Process the result of an asynchronous read. Retry on error -// @param fut cassandra future associated with the read -// @param cbData struct that holds the request parameters -void -flatMapReadCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::ReadCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params) { params.backend.read(params); }; - CassandraAsyncResult asyncResult{requestParams, fut, func}; - if (asyncResult.timedOut()) - requestParams.result.transaction = {0}; - CassandraResult& result = asyncResult.getResult(); - - if (!!result) - { - requestParams.result = { - result.getBytes(), result.getBytes(), result.getUInt32()}; - } -} - -// Process the result of an asynchronous read. Retry on error -// @param fut cassandra future associated with the read -// @param cbData struct that holds the request parameters -void -flatMapReadObjectCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::ReadObjectCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params) { params.backend.readObject(params); }; - CassandraAsyncResult asyncResult{requestParams, fut, func}; - if (asyncResult.timedOut()) - requestParams.result = {0}; - CassandraResult& result = asyncResult.getResult(); - - if (!!result) - { - requestParams.result = result.getBytes(); - } + makeAndExecuteAsyncWrite( + this, std::move(std::make_pair(seq, hash)), [this](auto& params) { + CassandraStatement statement{insertLedgerTransaction_}; + statement.bindInt(params.data.first); + statement.bindBytes(params.data.second); + return statement; + }); + makeAndExecuteAsyncWrite( + this, + std::move(std::make_tuple( + std::move(hash), seq, std::move(transaction), std::move(metadata))), + [this](auto& params) { + CassandraStatement statement{insertTransaction_}; + auto& [hash, sequence, transaction, metadata] = params.data; + statement.bindBytes(hash); + statement.bindInt(sequence); + statement.bindBytes(transaction); + statement.bindBytes(metadata); + return statement; + }); } std::optional -CassandraBackend::fetchLedgerRange() const +CassandraBackend::hardFetchLedgerRange() const { BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; CassandraStatement statement{selectLedgerRange_}; @@ -243,21 +284,93 @@ CassandraBackend::fetchLedgerRange() const std::vector CassandraBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const { - CassandraStatement statement{selectAllTransactionsInLedger_}; - statement.bindInt(ledgerSequence); - CassandraResult result = executeSyncRead(statement); - if (!result) + auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence); + return fetchTransactions(hashes); +} + +struct ReadCallbackData +{ + std::function onSuccess; + std::atomic_int& numOutstanding; + std::mutex& mtx; + std::condition_variable& cv; + bool errored = false; + ReadCallbackData( + std::atomic_int& numOutstanding, + std::mutex& m, + std::condition_variable& cv, + std::function onSuccess) + : numOutstanding(numOutstanding), mtx(m), cv(cv), onSuccess(onSuccess) { - BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows"; - return {}; } - std::vector txns; - do + + void + finish(CassFuture* fut) { - txns.push_back( - {result.getBytes(), result.getBytes(), result.getUInt32()}); - } while (result.nextRow()); - return txns; + CassError rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + errored = true; + } + else + { + CassandraResult result{cass_future_get_result(fut)}; + onSuccess(result); + } + std::lock_guard lck(mtx); + if (--numOutstanding == 0) + cv.notify_one(); + } +}; +void +processAsyncRead(CassFuture* fut, void* cbData) +{ + ReadCallbackData cb = *static_cast(cbData); + cb.finish(fut); +} +std::vector +CassandraBackend::fetchTransactions( + std::vector const& hashes) const +{ + std::size_t const numHashes = hashes.size(); + std::atomic_int numOutstanding = numHashes; + std::condition_variable cv; + std::mutex mtx; + std::vector results{numHashes}; + std::vector> cbs; + cbs.reserve(numHashes); + auto start = std::chrono::system_clock::now(); + for (std::size_t i = 0; i < hashes.size(); ++i) + { + CassandraStatement statement{selectTransaction_}; + statement.bindBytes(hashes[i]); + cbs.push_back(std::make_shared( + numOutstanding, mtx, cv, [i, &results](auto& result) { + if (result.hasResult()) + results[i] = { + result.getBytes(), + result.getBytes(), + result.getUInt32()}; + })); + executeAsyncRead(statement, processAsyncRead, *cbs[i]); + } + assert(results.size() == cbs.size()); + + std::unique_lock lck(mtx); + cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; }); + auto end = std::chrono::system_clock::now(); + for (auto const& cb : cbs) + { + if (cb->errored) + throw DatabaseTimeout(); + } + + BOOST_LOG_TRIVIAL(debug) + << "Fetched " << numHashes << " transactions from Cassandra in " + << std::chrono::duration_cast(end - start) + .count() + << " milliseconds"; + return results; } std::vector CassandraBackend::fetchAllTransactionHashesInLedger( @@ -265,7 +378,9 @@ CassandraBackend::fetchAllTransactionHashesInLedger( { CassandraStatement statement{selectAllTransactionHashesInLedger_}; statement.bindInt(ledgerSequence); + auto start = std::chrono::system_clock::now(); CassandraResult result = executeSyncRead(statement); + auto end = std::chrono::system_clock::now(); if (!result) { BOOST_LOG_TRIVIAL(error) @@ -278,133 +393,114 @@ CassandraBackend::fetchAllTransactionHashesInLedger( { hashes.push_back(result.getUInt256()); } while (result.nextRow()); + BOOST_LOG_TRIVIAL(debug) + << "Fetched " << hashes.size() + << " transaction hashes from Cassandra in " + << std::chrono::duration_cast(end - start) + .count() + << " milliseconds"; return hashes; } -struct ReadDiffCallbackData +std::pair< + std::vector, + std::optional> +CassandraBackend::fetchAccountTransactions( + ripple::AccountID const& account, + std::uint32_t limit, + std::optional const& cursorIn) const { - CassandraBackend const& backend; - uint32_t sequence; - std::vector& result; - std::mutex& mtx; - std::condition_variable& cv; - - std::atomic_uint32_t& numFinished; - size_t batchSize; - - ReadDiffCallbackData( - CassandraBackend const& backend, - uint32_t sequence, - std::vector& result, - std::mutex& mtx, - std::condition_variable& cv, - std::atomic_uint32_t& numFinished, - size_t batchSize) - : backend(backend) - , sequence(sequence) - , result(result) - , mtx(mtx) - , cv(cv) - , numFinished(numFinished) - , batchSize(batchSize) - { - } -}; - -void -flatMapReadDiffCallback(CassFuture* fut, void* cbData); -void -readDiff(ReadDiffCallbackData& data) -{ - CassandraStatement statement{ - data.backend.getSelectLedgerDiffPreparedStatement()}; - statement.bindInt(data.sequence); - - data.backend.executeAsyncRead(statement, flatMapReadDiffCallback, data); -} -// Process the result of an asynchronous read. Retry on error -// @param fut cassandra future associated with the read -// @param cbData struct that holds the request parameters -void -flatMapReadDiffCallback(CassFuture* fut, void* cbData) -{ - ReadDiffCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params) { readDiff(params); }; - CassandraAsyncResult asyncResult{requestParams, fut, func, true}; - CassandraResult& result = asyncResult.getResult(); - - if (!!result) - { - do - { - requestParams.result.push_back( - {result.getUInt256(), result.getBytes()}); - } while (result.nextRow()); - } -} -std::map> -CassandraBackend::fetchLedgerDiffs(std::vector const& sequences) const -{ - std::atomic_uint32_t numFinished = 0; - std::condition_variable cv; - std::mutex mtx; - std::map> results; - std::vector> cbs; - cbs.reserve(sequences.size()); - for (std::size_t i = 0; i < sequences.size(); ++i) - { - cbs.push_back(std::make_shared( - *this, - sequences[i], - results[sequences[i]], - mtx, - cv, - numFinished, - sequences.size())); - readDiff(*cbs[i]); - } - assert(results.size() == cbs.size()); - - std::unique_lock lck(mtx); - cv.wait(lck, [&numFinished, &sequences]() { - return numFinished == sequences.size(); - }); - - return results; -} - -std::vector -CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const -{ - CassandraStatement statement{selectLedgerDiff_}; - statement.bindInt(ledgerSequence); - - auto start = std::chrono::system_clock::now(); - CassandraResult result = executeSyncRead(statement); - - auto mid = std::chrono::system_clock::now(); - if (!result) - return {}; - std::vector objects; + auto rng = fetchLedgerRange(); + if (!rng) + return {{}, {}}; + std::pair< + std::vector, + std::optional> + res; + auto keylet = ripple::keylet::account(account); + std::vector hashes; + auto cursor = cursorIn; do { - objects.push_back({result.getUInt256(), result.getBytes()}); - } while (result.nextRow()); - auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(debug) - << __func__ << " Fetched diff. Fetch time = " - << std::to_string((mid - start).count() / 1000000000.0) - << " . total time = " - << std::to_string((end - start).count() / 1000000000.0); - return objects; + { + CassandraStatement statement{selectAccountTx_}; + statement.bindBytes(account); + if (cursor) + { + statement.bindUInt(cursor->ledgerSequence >> 20 << 20); + statement.bindIntTuple( + cursor->ledgerSequence, cursor->transactionIndex); + } + else + { + statement.bindUInt(rng->maxSequence >> 20 << 20); + + statement.bindIntTuple(INT32_MAX, INT32_MAX); + } + uint32_t adjustedLimit = limit - hashes.size(); + statement.bindUInt(adjustedLimit); + CassandraResult result = executeSyncRead(statement); + if (!result.hasResult()) + { + BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows returned"; + break; + } + + size_t numRows = result.numRows(); + + BOOST_LOG_TRIVIAL(info) << "num_rows = " << std::to_string(numRows); + do + { + hashes.push_back(result.getUInt256()); + --numRows; + if (numRows == 0) + { + BOOST_LOG_TRIVIAL(debug) << __func__ << " setting cursor"; + auto [lgrSeq, txnIdx] = result.getInt64Tuple(); + cursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx}; + } + } while (result.nextRow()); + } + if (hashes.size() < limit) + { + BOOST_LOG_TRIVIAL(debug) << __func__ << " less than limit"; + uint32_t seq = cursor->ledgerSequence; + seq = ((seq >> 20) - 1) << 20; + cursor->ledgerSequence = seq; + cursor->transactionIndex = INT32_MAX; + BOOST_LOG_TRIVIAL(debug) << __func__ << " walking back"; + CassandraStatement statement{selectObject_}; + statement.bindBytes(keylet.key); + statement.bindInt(seq); + CassandraResult result = executeSyncRead(statement); + if (!result) + { + BOOST_LOG_TRIVIAL(debug) + << __func__ << " account no longer exists"; + cursor = {}; + break; + } + } + } while (hashes.size() < limit && + cursor->ledgerSequence >= rng->minSequence); + + auto txns = fetchTransactions(hashes); + BOOST_LOG_TRIVIAL(debug) << __func__ << "txns = " << txns.size(); + if (txns.size() >= limit) + { + BOOST_LOG_TRIVIAL(debug) << __func__ << " returning cursor"; + return {txns, cursor}; + } + return {txns, {}}; } + LedgerPage CassandraBackend::doFetchLedgerPage( - std::optional const& cursor, + std::optional const& cursorIn, std::uint32_t ledgerSequence, std::uint32_t limit) const { + std::optional cursor = cursorIn; auto index = getKeyIndexOfSeq(ledgerSequence); if (!index) return {}; @@ -417,16 +513,13 @@ CassandraBackend::doFetchLedgerPage( << __func__ << " - Cursor = " << ripple::strHex(*cursor); CassandraStatement statement{selectKeys_}; statement.bindInt(index->keyIndex); - if (cursor) - { - auto thisCursor = *cursor; - statement.bindBytes(++thisCursor); - } - else + if (!cursor) { ripple::uint256 zero; - statement.bindBytes(zero); + cursor = zero; } + statement.bindBytes(cursor->data(), 1); + statement.bindBytes(*cursor); statement.bindUInt(limit + 1); CassandraResult result = executeSyncRead(statement); if (!!result) @@ -445,6 +538,12 @@ CassandraBackend::doFetchLedgerPage( page.cursor = keys.back(); ++(*page.cursor); } + else if (cursor->data()[0] != 0xFF) + { + ripple::uint256 zero; + zero.data()[0] = cursor->data()[0] + 1; + page.cursor = zero; + } auto objects = fetchLedgerObjects(keys, ledgerSequence); if (objects.size() != keys.size()) throw std::runtime_error("Mismatch in size of objects and keys"); @@ -483,25 +582,31 @@ CassandraBackend::fetchLedgerObjects( std::size_t const numKeys = keys.size(); BOOST_LOG_TRIVIAL(trace) << "Fetching " << numKeys << " records from Cassandra"; - std::atomic_uint32_t numFinished = 0; + std::atomic_int numOutstanding = numKeys; std::condition_variable cv; std::mutex mtx; std::vector results{numKeys}; - std::vector> cbs; + std::vector> cbs; cbs.reserve(numKeys); for (std::size_t i = 0; i < keys.size(); ++i) { - cbs.push_back(std::make_shared( - *this, keys[i], sequence, results[i], mtx, cv, numFinished, numKeys)); - readObject(*cbs[i]); + cbs.push_back(std::make_shared( + numOutstanding, mtx, cv, [i, &results](auto& result) { + if (result.hasResult()) + results[i] = result.getBytes(); + })); + CassandraStatement statement{selectObject_}; + statement.bindBytes(keys[i]); + statement.bindInt(sequence); + executeAsyncRead(statement, processAsyncRead, *cbs[i]); } assert(results.size() == cbs.size()); std::unique_lock lck(mtx); - cv.wait(lck, [&numFinished, &numKeys]() { return numFinished == numKeys; }); - for (auto const& res : results) + cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; }); + for (auto const& cb : cbs) { - if (res.size() == 1 && res[0] == 0) + if (cb->errored) throw DatabaseTimeout(); } @@ -509,241 +614,6 @@ CassandraBackend::fetchLedgerObjects( << "Fetched " << numKeys << " records from Cassandra"; return results; } -struct WriteBookCallbackData -{ - CassandraBackend const& backend; - ripple::uint256 book; - ripple::uint256 offerKey; - uint32_t ledgerSequence; - std::condition_variable& cv; - std::atomic_uint32_t& numOutstanding; - std::mutex& mtx; - uint32_t currentRetries = 0; - WriteBookCallbackData( - CassandraBackend const& backend, - ripple::uint256 const& book, - ripple::uint256 const& offerKey, - uint32_t ledgerSequence, - std::condition_variable& cv, - std::mutex& mtx, - std::atomic_uint32_t& numOutstanding) - : backend(backend) - , book(book) - , offerKey(offerKey) - , ledgerSequence(ledgerSequence) - , cv(cv) - , mtx(mtx) - , numOutstanding(numOutstanding) - - { - } -}; -void -writeBookCallback(CassFuture* fut, void* cbData); -void -writeBook(WriteBookCallbackData& cb) -{ - CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()}; - statement.bindBytes(cb.book.data(), 24); - statement.bindInt(cb.ledgerSequence); - statement.bindBytes(cb.book.data() + 24, 8); - statement.bindBytes(cb.offerKey); - // Passing isRetry as true bypasses incrementing numOutstanding - cb.backend.executeAsyncWrite(statement, writeBookCallback, cb, true); -} -void -writeBookCallback(CassFuture* fut, void* cbData) -{ - WriteBookCallbackData& requestParams = - *static_cast(cbData); - - CassandraBackend const& backend = requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - // exponential backoff with a max wait of 2^10 ms (about 1 second) - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert book error: " << rc << ", " - << cass_error_desc(rc) << ", retrying in " << wait.count() - << " milliseconds"; - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.getIOContext(), - std::chrono::steady_clock::now() + wait); - timer->async_wait( - [timer, &requestParams](const boost::system::error_code& error) { - writeBook(requestParams); - }); - } - else - { - BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book"; - { - std::lock_guard lck(requestParams.mtx); - --requestParams.numOutstanding; - requestParams.cv.notify_one(); - } - } -} - -struct WriteKeyCallbackData -{ - CassandraBackend const& backend; - ripple::uint256 key; - uint32_t ledgerSequence; - std::condition_variable& cv; - std::atomic_uint32_t& numRemaining; - std::mutex& mtx; - uint32_t currentRetries = 0; - WriteKeyCallbackData( - CassandraBackend const& backend, - ripple::uint256 const& key, - uint32_t ledgerSequence, - std::condition_variable& cv, - std::mutex& mtx, - std::atomic_uint32_t& numRemaining) - : backend(backend) - , key(key) - , ledgerSequence(ledgerSequence) - , cv(cv) - , mtx(mtx) - , numRemaining(numRemaining) - - { - } -}; -struct OnlineDeleteCallbackData -{ - CassandraBackend const& backend; - ripple::uint256 key; - uint32_t ledgerSequence; - std::vector object; - std::condition_variable& cv; - std::atomic_uint32_t& numOutstanding; - std::mutex& mtx; - uint32_t currentRetries = 0; - OnlineDeleteCallbackData( - CassandraBackend const& backend, - ripple::uint256&& key, - uint32_t ledgerSequence, - std::vector&& object, - std::condition_variable& cv, - std::mutex& mtx, - std::atomic_uint32_t& numOutstanding) - : backend(backend) - , key(std::move(key)) - , ledgerSequence(ledgerSequence) - , object(std::move(object)) - , cv(cv) - , mtx(mtx) - , numOutstanding(numOutstanding) - - { - } -}; -void -onlineDeleteCallback(CassFuture* fut, void* cbData); -void -onlineDelete(OnlineDeleteCallbackData& cb) -{ - { - CassandraStatement statement{ - cb.backend.getInsertObjectPreparedStatement()}; - statement.bindBytes(cb.key); - statement.bindInt(cb.ledgerSequence); - statement.bindBytes(cb.object); - - cb.backend.executeAsyncWrite(statement, onlineDeleteCallback, cb, true); - } -} -void -onlineDeleteCallback(CassFuture* fut, void* cbData) -{ - OnlineDeleteCallbackData& requestParams = - *static_cast(cbData); - - CassandraBackend const& backend = requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - // exponential backoff with a max wait of 2^10 ms (about 1 second) - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert book error: " << rc << ", " - << cass_error_desc(rc) << ", retrying in " << wait.count() - << " milliseconds"; - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.getIOContext(), - std::chrono::steady_clock::now() + wait); - timer->async_wait( - [timer, &requestParams](const boost::system::error_code& error) { - onlineDelete(requestParams); - }); - } - else - { - BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book"; - { - std::lock_guard lck(requestParams.mtx); - --requestParams.numOutstanding; - requestParams.cv.notify_one(); - } - } -} -void -writeKeyCallback(CassFuture* fut, void* cbData); -void -writeKey(WriteKeyCallbackData& cb) -{ - CassandraStatement statement{cb.backend.getInsertKeyPreparedStatement()}; - statement.bindInt(cb.ledgerSequence); - statement.bindBytes(cb.key); - // Passing isRetry as true bypasses incrementing numOutstanding - cb.backend.executeAsyncWrite(statement, writeKeyCallback, cb, true); -} -void -writeKeyCallback(CassFuture* fut, void* cbData) -{ - WriteKeyCallbackData& requestParams = - *static_cast(cbData); - - CassandraBackend const& backend = requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert key error: " << rc << ", " - << cass_error_desc(rc) << ", retrying in " << wait.count() - << " milliseconds"; - // exponential backoff with a max wait of 2^10 ms (about 1 second) - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.getIOContext(), - std::chrono::steady_clock::now() + wait); - timer->async_wait( - [timer, &requestParams](const boost::system::error_code& error) { - writeKey(requestParams); - }); - } - else - { - BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a key"; - { - std::lock_guard lck(requestParams.mtx); - --requestParams.numRemaining; - requestParams.cv.notify_one(); - } - } -} bool CassandraBackend::writeKeys( @@ -751,10 +621,21 @@ CassandraBackend::writeKeys( KeyIndex const& index, bool isAsync) const { - std::atomic_uint32_t numRemaining = keys.size(); + auto bind = [this](auto& params) { + auto& [lgrSeq, key] = params.data; + CassandraStatement statement{insertKey_}; + statement.bindInt(lgrSeq); + statement.bindBytes(key.data(), 1); + statement.bindBytes(key); + return statement; + }; + std::atomic_int numOutstanding = 0; std::condition_variable cv; std::mutex mtx; - std::vector> cbs; + std::vector, + typename std::remove_reference::type>>> + cbs; cbs.reserve(keys.size()); uint32_t concurrentLimit = isAsync ? indexerMaxRequestsOutstanding : maxRequestsOutstanding; @@ -766,261 +647,32 @@ CassandraBackend::writeKeys( uint32_t numSubmitted = 0; for (auto& key : keys) { - cbs.push_back(std::make_shared( - *this, key, index.keyIndex, cv, mtx, numRemaining)); - writeKey(*cbs.back()); + cbs.push_back(makeAndExecuteBulkAsyncWrite( + this, + std::make_pair(index.keyIndex, std::move(key)), + bind, + numOutstanding, + mtx, + cv)); + ++numOutstanding; ++numSubmitted; - BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request"; std::unique_lock lck(mtx); - BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; - cv.wait(lck, [&numRemaining, numSubmitted, concurrentLimit, &keys]() { - BOOST_LOG_TRIVIAL(trace) << std::to_string(numSubmitted) << " " - << std::to_string(numRemaining) << " " - << std::to_string(keys.size()) << " " - << std::to_string(concurrentLimit); + cv.wait(lck, [&numOutstanding, concurrentLimit, &keys]() { // keys.size() - i is number submitted. keys.size() - // numRemaining is number completed Difference is num // outstanding - return (numSubmitted - (keys.size() - numRemaining)) < - concurrentLimit; + return numOutstanding < concurrentLimit; }); if (numSubmitted % 100000 == 0) BOOST_LOG_TRIVIAL(debug) - << __func__ << " Submitted " << std::to_string(numSubmitted) - << " write requests. Completed " - << (keys.size() - numRemaining); + << __func__ << " Submitted " << std::to_string(numSubmitted); } std::unique_lock lck(mtx); - cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); + cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; }); return true; } -bool -CassandraBackend::isIndexed(uint32_t ledgerSequence) const -{ - return false; - /* - auto rng = fetchLedgerRange(); - if (!rng) - return false; - if (ledgerSequence != rng->minSequence && - ledgerSequence != (ledgerSequence >> indexerShift_ << indexerShift_)) - ledgerSequence = ((ledgerSequence >> indexerShift_) << indexerShift_) + - (1 << indexerShift_); - CassandraStatement statement{selectKeys_}; - statement.bindInt(ledgerSequence); - ripple::uint256 zero; - statement.bindBytes(zero); - statement.bindUInt(1); - CassandraResult result = executeSyncRead(statement); - return !!result; - */ -} - -std::optional -CassandraBackend::getNextToIndex() const -{ - return {}; - /* - auto rng = fetchLedgerRange(); - if (!rng) - return {}; - uint32_t cur = rng->minSequence; - while (isIndexed(cur)) - { - cur = ((cur >> indexerShift_) << indexerShift_) + (1 << indexerShift_); - } - return cur; - */ -} - -bool -CassandraBackend::runIndexer(uint32_t ledgerSequence) const -{ - return false; - /* - auto start = std::chrono::system_clock::now(); - constexpr uint32_t limit = 2048; - std::unordered_set keys; - std::unordered_map offers; - std::unordered_map> - books; - std::optional cursor; - size_t numOffers = 0; - uint32_t base = ledgerSequence; - auto rng = fetchLedgerRange(); - if (base != rng->minSequence) - { - base = (base >> indexerShift_) << indexerShift_; - base -= (1 << indexerShift_); - if (base < rng->minSequence) - base = rng->minSequence; - } - BOOST_LOG_TRIVIAL(info) - << __func__ << " base = " << std::to_string(base) - << " next to index = " << std::to_string(ledgerSequence); - while (true) - { - try - { - auto [objects, curCursor] = fetchLedgerPage(cursor, base, limit); - BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; - cursor = curCursor; - for (auto& obj : objects) - { - if (isOffer(obj.blob)) - { - auto bookDir = getBook(obj.blob); - books[bookDir].insert(obj.key); - offers[obj.key] = bookDir; - ++numOffers; - } - keys.insert(std::move(obj.key)); - if (keys.size() % 100000 == 0) - BOOST_LOG_TRIVIAL(info) - << __func__ << " Fetched " - << std::to_string(keys.size()) << "keys"; - } - if (!cursor) - break; - } - catch (DatabaseTimeout const& e) - { - BOOST_LOG_TRIVIAL(warning) - << __func__ << " Database timeout fetching keys"; - std::this_thread::sleep_for(std::chrono::seconds(2)); - } - } - auto mid = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) - << __func__ << "Fetched all keys from ledger " << std::to_string(base) - << " . num keys = " << keys.size() << " num books = " << books.size() - << " num offers = " << numOffers << " . Took " - << (mid - start).count() / 1000000000.0; - if (base == ledgerSequence) - { - BOOST_LOG_TRIVIAL(info) << __func__ << "Writing keys"; - writeKeys(keys, ledgerSequence); - writeBooks(books, ledgerSequence, numOffers); - auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) - << __func__ << "Wrote all keys from ledger " - << std::to_string(ledgerSequence) << " . num keys = " << keys.size() - << " . Took " << (end - mid).count() / 1000000000.0 - << ". Entire operation took " - << (end - start).count() / 1000000000.0; - } - else - { - writeBooks(books, base, numOffers); - BOOST_LOG_TRIVIAL(info) - << __func__ << "Wrote books. Skipping writing keys"; - } - - uint32_t prevLedgerSequence = base; - uint32_t nextLedgerSequence = - ((prevLedgerSequence >> indexerShift_) << indexerShift_); - BOOST_LOG_TRIVIAL(info) - << __func__ << " next base = " << std::to_string(nextLedgerSequence); - nextLedgerSequence += (1 << indexerShift_); - BOOST_LOG_TRIVIAL(info) - << __func__ << " next = " << std::to_string(nextLedgerSequence); - while (true) - { - BOOST_LOG_TRIVIAL(info) - << __func__ << " Processing diffs. nextLedger = " - << std::to_string(nextLedgerSequence); - auto rng = fetchLedgerRange(); - if (rng->maxSequence < nextLedgerSequence) - break; - start = std::chrono::system_clock::now(); - for (size_t i = prevLedgerSequence; i <= nextLedgerSequence; i += 256) - { - auto start2 = std::chrono::system_clock::now(); - std::unordered_map< - ripple::uint256, - std::unordered_set> - booksDeleted; - size_t numOffersDeleted = 0; - // Get the diff and update keys - std::vector objs; - std::vector sequences(256, 0); - std::iota(sequences.begin(), sequences.end(), i + 1); - - auto diffs = fetchLedgerDiffs(sequences); - for (auto const& diff : diffs) - { - for (auto const& obj : diff.second) - { - // remove deleted keys - if (obj.blob.size() == 0) - { - keys.erase(obj.key); - if (offers.count(obj.key) > 0) - { - auto book = offers[obj.key]; - if (booksDeleted[book].insert(obj.key).second) - ++numOffersDeleted; - offers.erase(obj.key); - } - } - else - { - // insert other keys. keys is a set, so this is a - // noop if obj.key is already in keys - keys.insert(obj.key); - // if the object is an offer, add to books - if (isOffer(obj.blob)) - { - auto book = getBook(obj.blob); - if (books[book].insert(obj.key).second) - ++numOffers; - offers[obj.key] = book; - } - } - } - } - if (sequences.back() % 256 != 0) - { - BOOST_LOG_TRIVIAL(error) - << __func__ - << " back : " << std::to_string(sequences.back()) - << " front : " << std::to_string(sequences.front()) - << " size : " << std::to_string(sequences.size()); - throw std::runtime_error( - "Last sequence is not divisible by 256"); - } - - for (auto& book : booksDeleted) - { - for (auto& offerKey : book.second) - { - if (books[book.first].erase(offerKey)) - --numOffers; - } - } - writeBooks(books, sequences.back(), numOffers); - writeBooks(booksDeleted, sequences.back(), numOffersDeleted); - auto mid = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) << __func__ << " Fetched 256 diffs. Took " - << (mid - start2).count() / 1000000000.0; - } - auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) - << __func__ << "Fetched all from diffs " - << std::to_string(nextLedgerSequence) - << " shift width = " << std::to_string(indexerShift_) - << ". num keys = " << keys.size() << " . Took " - << (end - start).count() / 1000000000.0 - << " prev ledger = " << std::to_string(prevLedgerSequence); - writeKeys(keys, nextLedgerSequence); - prevLedgerSequence = nextLedgerSequence; - nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_); - } - return true; -*/ -} bool CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const { @@ -1028,17 +680,28 @@ CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const // ledgers close roughly every 4 seconds. We double the TTL so that way // there is a window of time to update the database, to prevent unchanging // records from being deleted. - auto rng = fetchLedgerRangeNoThrow(); + auto rng = fetchLedgerRange(); if (!rng) return false; uint32_t minLedger = rng->maxSequence - numLedgersToKeep; if (minLedger <= rng->minSequence) return false; + auto bind = [this](auto& params) { + auto& [key, seq, obj] = params.data; + CassandraStatement statement{insertObject_}; + statement.bindBytes(key); + statement.bindInt(seq); + statement.bindBytes(obj); + return statement; + }; std::condition_variable cv; std::mutex mtx; - std::vector> cbs; + std::vector, + typename std::remove_reference::type>>> + cbs; uint32_t concurrentLimit = 10; - std::atomic_uint32_t numOutstanding = 0; + std::atomic_int numOutstanding = 0; // iterate through latest ledger, updating TTL std::optional cursor; @@ -1060,16 +723,15 @@ CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const for (auto& obj : objects) { ++numOutstanding; - cbs.push_back(std::make_shared( - *this, - std::move(obj.key), - minLedger, - std::move(obj.blob), - cv, + cbs.push_back(makeAndExecuteBulkAsyncWrite( + this, + std::make_tuple( + std::move(obj.key), minLedger, std::move(obj.blob)), + bind, + numOutstanding, mtx, - numOutstanding)); + cv)); - onlineDelete(*cbs.back()); std::unique_lock lck(mtx); BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; cv.wait(lck, [&numOutstanding, concurrentLimit]() { @@ -1386,6 +1048,14 @@ CassandraBackend::open(bool readOnly) << " WITH default_time_to_live = " << std::to_string(ttl); if (!executeSimpleStatement(query.str())) continue; + query.str(""); + query << "CREATE TABLE IF NOT EXISTS " << tablePrefix + << "ledger_transactions" + << " ( ledger_sequence bigint, hash blob, PRIMARY " + "KEY(ledger_sequence, hash))" + << " WITH default_time_to_live = " << std::to_string(ttl); + if (!executeSimpleStatement(query.str())) + continue; query.str(""); query << "SELECT * FROM " << tablePrefix << "transactions" @@ -1407,8 +1077,8 @@ CassandraBackend::open(bool readOnly) query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "keys" - << " ( sequence bigint, key blob, PRIMARY KEY " - "(sequence, key))" + << " ( sequence bigint, first_byte blob, key blob, PRIMARY KEY " + "((sequence,first_byte), key))" " WITH default_time_to_live = " << std::to_string(keysTtl); if (!executeSimpleStatement(query.str())) @@ -1421,10 +1091,11 @@ CassandraBackend::open(bool readOnly) continue; query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx" - << " ( account blob, seq_idx tuple, " + << " ( account blob, idx int, seq_idx " + "tuple, " " hash blob, " "PRIMARY KEY " - "(account, seq_idx)) WITH " + "((account,idx), seq_idx)) WITH " "CLUSTERING ORDER BY (seq_idx desc)" << " AND default_time_to_live = " << std::to_string(ttl); @@ -1496,16 +1167,23 @@ CassandraBackend::open(bool readOnly) "?, ?)"; if (!insertTransaction_.prepareStatement(query, session_.get())) continue; + query.str(""); + query << "INSERT INTO " << tablePrefix << "ledger_transactions" + << " (ledger_sequence, hash) VALUES " + "(?, ?)"; + if (!insertLedgerTransaction_.prepareStatement(query, session_.get())) + continue; query.str(""); query << "INSERT INTO " << tablePrefix << "keys" - << " (sequence, key) VALUES (?, ?)"; + << " (sequence,first_byte, key) VALUES (?, ?, ?)"; if (!insertKey_.prepareStatement(query, session_.get())) continue; query.str(""); query << "SELECT key FROM " << tablePrefix << "keys" - << " WHERE sequence = ? AND key >= ? ORDER BY key ASC LIMIT ?"; + << " WHERE sequence = ? AND first_byte = ? AND key >= ? ORDER BY " + "key ASC LIMIT ?"; if (!selectKeys_.prepareStatement(query, session_.get())) continue; @@ -1525,14 +1203,7 @@ CassandraBackend::open(bool readOnly) continue; query.str(""); - query << "SELECT transaction, metadata, ledger_sequence FROM " - << tablePrefix << "transactions" - << " WHERE ledger_sequence = ?"; - if (!selectAllTransactionsInLedger_.prepareStatement( - query, session_.get())) - continue; - query.str(""); - query << "SELECT hash FROM " << tablePrefix << "transactions" + query << "SELECT hash FROM " << tablePrefix << "ledger_transactions" << " WHERE ledger_sequence = ?"; if (!selectAllTransactionHashesInLedger_.prepareStatement( query, session_.get())) @@ -1563,14 +1234,15 @@ CassandraBackend::open(bool readOnly) query.str(""); query << " INSERT INTO " << tablePrefix << "account_tx" - << " (account, seq_idx, hash) " - << " VALUES (?,?,?)"; + << " (account, idx, seq_idx, hash) " + << " VALUES (?,?,?,?)"; if (!insertAccountTx_.prepareStatement(query, session_.get())) continue; query.str(""); query << " SELECT hash,seq_idx FROM " << tablePrefix << "account_tx" << " WHERE account = ? " + << " AND idx = ? " << " AND seq_idx < ? LIMIT ?"; if (!selectAccountTx_.prepareStatement(query, session_.get())) continue; @@ -1622,13 +1294,6 @@ CassandraBackend::open(bool readOnly) query << " SELECT sequence FROM " << tablePrefix << "ledger_range"; if (!selectLedgerRange_.prepareStatement(query, session_.get())) continue; - /* - query.str(""); - query << " SELECT key,object FROM " << tablePrefix - << "objects WHERE sequence = ?"; - if (!selectLedgerDiff_.prepareStatement(query, session_.get())) - continue; - */ setupPreparedStatements = true; } diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index 66f24e7c..efcba311 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -26,37 +26,16 @@ #include #include #include +#include +#include #include #include #include #include #include -#include -#include namespace Backend { -void -flatMapWriteCallback(CassFuture* fut, void* cbData); -void -flatMapWriteKeyCallback(CassFuture* fut, void* cbData); -void -flatMapWriteTransactionCallback(CassFuture* fut, void* cbData); -void -flatMapWriteBookCallback(CassFuture* fut, void* cbData); -void -flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData); -void -flatMapReadCallback(CassFuture* fut, void* cbData); -void -flatMapReadObjectCallback(CassFuture* fut, void* cbData); -void -flatMapGetCreatedCallback(CassFuture* fut, void* cbData); -void -flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData); -void -flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData); - class CassandraPreparedStatement { private: @@ -129,6 +108,15 @@ public: cass_statement_set_consistency(statement_, CASS_CONSISTENCY_QUORUM); } + CassandraStatement(CassandraStatement&& other) + { + statement_ = other.statement_; + other.statement_ = nullptr; + curBindingIndex_ = other.curBindingIndex_; + other.curBindingIndex_ = 0; + } + CassandraStatement(CassandraStatement const& other) = delete; + CassStatement* get() const { @@ -535,65 +523,6 @@ isTimeout(CassError rc) return true; return false; } -template -class CassandraAsyncResult -{ - T& requestParams_; - CassandraResult result_; - bool timedOut_ = false; - bool retryOnTimeout_ = false; - -public: - CassandraAsyncResult( - T& requestParams, - CassFuture* fut, - F retry, - bool retryOnTimeout = false) - : requestParams_(requestParams), retryOnTimeout_(retryOnTimeout) - { - CassError rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - // TODO - should we ever be retrying requests? These are reads, - // and they usually only fail when the db is under heavy load. Seems - // best to just return an error to the client and have the client - // try again - if (isTimeout(rc)) - timedOut_ = true; - if (!timedOut_ || retryOnTimeout_) - retry(requestParams_); - } - else - { - result_ = std::move(CassandraResult(cass_future_get_result(fut))); - } - } - - ~CassandraAsyncResult() - { - if (result_.isOk() or timedOut_) - { - BOOST_LOG_TRIVIAL(trace) << "finished a request"; - size_t batchSize = requestParams_.batchSize; - - std::unique_lock lk(requestParams_.mtx); - if (++(requestParams_.numFinished) == batchSize) - requestParams_.cv.notify_all(); - } - } - - CassandraResult& - getResult() - { - return result_; - } - - bool - timedOut() - { - return timedOut_; - } -}; class CassandraBackend : public BackendInterface { @@ -635,8 +564,8 @@ private: // than making a new statement CassandraPreparedStatement insertObject_; CassandraPreparedStatement insertTransaction_; + CassandraPreparedStatement insertLedgerTransaction_; CassandraPreparedStatement selectTransaction_; - CassandraPreparedStatement selectAllTransactionsInLedger_; CassandraPreparedStatement selectAllTransactionHashesInLedger_; CassandraPreparedStatement selectObject_; CassandraPreparedStatement selectLedgerPageKeys_; @@ -645,12 +574,6 @@ private: CassandraPreparedStatement getToken_; CassandraPreparedStatement insertKey_; CassandraPreparedStatement selectKeys_; - CassandraPreparedStatement getBook_; - CassandraPreparedStatement selectBook_; - CassandraPreparedStatement completeBook_; - CassandraPreparedStatement insertBook_; - CassandraPreparedStatement insertBook2_; - CassandraPreparedStatement deleteBook_; CassandraPreparedStatement insertAccountTx_; CassandraPreparedStatement selectAccountTx_; CassandraPreparedStatement insertLedgerHeader_; @@ -662,7 +585,6 @@ private: CassandraPreparedStatement selectLedgerByHash_; CassandraPreparedStatement selectLatestLedger_; CassandraPreparedStatement selectLedgerRange_; - CassandraPreparedStatement selectLedgerDiff_; // io_context used for exponential backoff for write retries mutable boost::asio::io_context ioContext_; @@ -700,17 +622,10 @@ public: ~CassandraBackend() override { - BOOST_LOG_TRIVIAL(info) << __func__; if (open_) close(); } - std::string - getName() - { - return "cassandra"; - } - bool isOpen() { @@ -734,27 +649,6 @@ public: } open_ = false; } - CassandraPreparedStatement const& - getInsertKeyPreparedStatement() const - { - return insertKey_; - } - CassandraPreparedStatement const& - getInsertBookPreparedStatement() const - { - return insertBook2_; - } - CassandraPreparedStatement const& - getInsertObjectPreparedStatement() const - { - return insertObject_; - } - - CassandraPreparedStatement const& - getSelectLedgerDiffPreparedStatement() const - { - return selectLedgerDiff_; - } std::pair< std::vector, @@ -762,82 +656,14 @@ public: fetchAccountTransactions( ripple::AccountID const& account, std::uint32_t limit, - std::optional const& cursor) const override - { - BOOST_LOG_TRIVIAL(debug) << "Starting doAccountTx"; - CassandraStatement statement{selectAccountTx_}; - statement.bindBytes(account); - if (cursor) - statement.bindIntTuple( - cursor->ledgerSequence, cursor->transactionIndex); - else - statement.bindIntTuple(INT32_MAX, INT32_MAX); - statement.bindUInt(limit); - CassandraResult result = executeSyncRead(statement); - if (!result.hasResult()) - { - BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows returned"; - return {{}, {}}; - } - - std::vector hashes; - size_t numRows = result.numRows(); - bool returnCursor = numRows == limit; - std::optional retCursor; - - BOOST_LOG_TRIVIAL(info) << "num_rows = " << std::to_string(numRows); - do - { - hashes.push_back(result.getUInt256()); - --numRows; - if (numRows == 0 && returnCursor) - { - auto [lgrSeq, txnIdx] = result.getInt64Tuple(); - retCursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx}; - } - } while (result.nextRow()); - - BOOST_LOG_TRIVIAL(debug) - << "doAccountTx - populated hashes. num hashes = " << hashes.size(); - if (hashes.size()) - { - return {fetchTransactions(hashes), retCursor}; - } - return {{}, {}}; - } - - struct WriteLedgerHeaderCallbackData - { - CassandraBackend const* backend; - uint32_t sequence; - std::string header; - uint32_t currentRetries = 0; - - std::atomic refs = 1; - WriteLedgerHeaderCallbackData( - CassandraBackend const* f, - uint32_t sequence, - std::string&& header) - : backend(f), sequence(sequence), header(std::move(header)) - { - } - }; - struct WriteLedgerHashCallbackData - { - CassandraBackend const* backend; - ripple::uint256 hash; - uint32_t sequence; - uint32_t currentRetries = 0; - - std::atomic refs = 1; - WriteLedgerHashCallbackData( - CassandraBackend const* f, - ripple::uint256 hash, - uint32_t sequence) - : backend(f), hash(hash), sequence(sequence) - { - } - }; + std::optional const& cursor) const override; + std::pair< + std::vector, + std::optional> + doFetchAccountTransactions( + ripple::AccountID const& account, + std::uint32_t limit, + std::optional const& cursor) const; bool doFinishWrites() const override @@ -872,37 +698,7 @@ public: writeLedger( ripple::LedgerInfo const& ledgerInfo, std::string&& header, - bool isFirst = false) const override - { - WriteLedgerHeaderCallbackData* headerCb = - new WriteLedgerHeaderCallbackData( - this, ledgerInfo.seq, std::move(header)); - WriteLedgerHashCallbackData* hashCb = new WriteLedgerHashCallbackData( - this, ledgerInfo.hash, ledgerInfo.seq); - writeLedgerHeader(*headerCb, false); - writeLedgerHash(*hashCb, false); - ledgerSequence_ = ledgerInfo.seq; - isFirstLedger_ = isFirst; - } - void - writeLedgerHash(WriteLedgerHashCallbackData& cb, bool isRetry) const - { - CassandraStatement statement{insertLedgerHash_}; - statement.bindBytes(cb.hash); - statement.bindInt(cb.sequence); - executeAsyncWrite( - statement, flatMapWriteLedgerHashCallback, cb, isRetry); - } - - void - writeLedgerHeader(WriteLedgerHeaderCallbackData& cb, bool isRetry) const - { - CassandraStatement statement{insertLedgerHeader_}; - statement.bindInt(cb.sequence); - statement.bindBytes(cb.header); - executeAsyncWrite( - statement, flatMapWriteLedgerHeaderCallback, cb, isRetry); - } + bool isFirst = false) const override; std::optional fetchLatestLedgerSequence() const override @@ -956,7 +752,7 @@ public: } std::optional - fetchLedgerRange() const override; + hardFetchLedgerRange() const override; std::vector fetchAllTransactionsInLedger(uint32_t ledgerSequence) const override; @@ -964,11 +760,8 @@ public: std::vector fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const override; - // Synchronously fetch the object with key key and store the result in - // pno - // @param key the key of the object - // @param pno object in which to store the result - // @return result status of query + // Synchronously fetch the object with key key, as of ledger with sequence + // sequence std::optional fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const override @@ -983,7 +776,10 @@ public: BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows"; return {}; } - return result.getBytes(); + auto res = result.getBytes(); + if (res.size()) + return res; + return {}; } std::optional @@ -1024,18 +820,6 @@ public: std::optional const& cursor, std::uint32_t ledgerSequence, std::uint32_t limit) const override; - std::vector - fetchLedgerDiff(uint32_t ledgerSequence) const; - std::map> - fetchLedgerDiffs(std::vector const& sequences) const; - - bool - runIndexer(uint32_t ledgerSequence) const; - bool - isIndexed(uint32_t ledgerSequence) const; - - std::optional - getNextToIndex() const; bool writeKeys( @@ -1043,208 +827,15 @@ public: KeyIndex const& index, bool isAsync = false) const override; - bool - canFetchBatch() - { - return true; - } - - struct ReadCallbackData - { - CassandraBackend const& backend; - ripple::uint256 const& hash; - TransactionAndMetadata& result; - std::mutex& mtx; - std::condition_variable& cv; - - std::atomic_uint32_t& numFinished; - size_t batchSize; - - ReadCallbackData( - CassandraBackend const& backend, - ripple::uint256 const& hash, - TransactionAndMetadata& result, - std::mutex& mtx, - std::condition_variable& cv, - std::atomic_uint32_t& numFinished, - size_t batchSize) - : backend(backend) - , hash(hash) - , result(result) - , mtx(mtx) - , cv(cv) - , numFinished(numFinished) - , batchSize(batchSize) - { - } - - ReadCallbackData(ReadCallbackData const& other) = default; - }; - std::vector - fetchTransactions(std::vector const& hashes) const override - { - std::size_t const numHashes = hashes.size(); - BOOST_LOG_TRIVIAL(debug) - << "Fetching " << numHashes << " transactions from Cassandra"; - std::atomic_uint32_t numFinished = 0; - std::condition_variable cv; - std::mutex mtx; - std::vector results{numHashes}; - std::vector> cbs; - cbs.reserve(numHashes); - for (std::size_t i = 0; i < hashes.size(); ++i) - { - cbs.push_back(std::make_shared( - *this, hashes[i], results[i], mtx, cv, numFinished, numHashes)); - read(*cbs[i]); - } - assert(results.size() == cbs.size()); + fetchTransactions( + std::vector const& hashes) const override; - std::unique_lock lck(mtx); - cv.wait(lck, [&numFinished, &numHashes]() { - return numFinished == numHashes; - }); - for (auto const& res : results) - { - if (res.transaction.size() == 1 && res.transaction[0] == 0) - throw DatabaseTimeout(); - } - - BOOST_LOG_TRIVIAL(debug) - << "Fetched " << numHashes << " transactions from Cassandra"; - return results; - } - - void - read(ReadCallbackData& data) const - { - CassandraStatement statement{selectTransaction_}; - statement.bindBytes(data.hash); - executeAsyncRead(statement, flatMapReadCallback, data); - } - - struct ReadObjectCallbackData - { - CassandraBackend const& backend; - ripple::uint256 const& key; - uint32_t sequence; - Blob& result; - std::mutex& mtx; - std::condition_variable& cv; - - std::atomic_uint32_t& numFinished; - size_t batchSize; - - ReadObjectCallbackData( - CassandraBackend const& backend, - ripple::uint256 const& key, - uint32_t sequence, - Blob& result, - std::mutex& mtx, - std::condition_variable& cv, - std::atomic_uint32_t& numFinished, - size_t batchSize) - : backend(backend) - , key(key) - , sequence(sequence) - , result(result) - , mtx(mtx) - , cv(cv) - , numFinished(numFinished) - , batchSize(batchSize) - { - } - - ReadObjectCallbackData(ReadObjectCallbackData const& other) = default; - }; - - void - readObject(ReadObjectCallbackData& data) const - { - CassandraStatement statement{selectObject_}; - statement.bindBytes(data.key); - statement.bindInt(data.sequence); - - executeAsyncRead(statement, flatMapReadObjectCallback, data); - } std::vector fetchLedgerObjects( std::vector const& keys, uint32_t sequence) const override; - struct WriteCallbackData - { - CassandraBackend const* backend; - std::string key; - uint32_t sequence; - uint32_t createdSequence = 0; - std::string blob; - bool isCreated; - bool isDeleted; - std::optional book; - - uint32_t currentRetries = 0; - std::atomic refs = 1; - - WriteCallbackData( - CassandraBackend const* f, - std::string&& key, - uint32_t sequence, - std::string&& blob, - bool isCreated, - bool isDeleted, - std::optional&& inBook) - : backend(f) - , key(std::move(key)) - , sequence(sequence) - , blob(std::move(blob)) - , isCreated(isCreated) - , isDeleted(isDeleted) - , book(std::move(inBook)) - { - } - }; - - struct WriteAccountTxCallbackData - { - CassandraBackend const* backend; - ripple::AccountID account; - uint32_t ledgerSequence; - uint32_t transactionIndex; - ripple::uint256 txHash; - - uint32_t currentRetries = 0; - std::atomic refs = 1; - - WriteAccountTxCallbackData( - CassandraBackend const* f, - ripple::AccountID&& account, - uint32_t lgrSeq, - uint32_t txIdx, - ripple::uint256&& hash) - : backend(f) - , account(std::move(account)) - , ledgerSequence(lgrSeq) - , transactionIndex(txIdx) - , txHash(std::move(hash)) - { - } - }; - - void - write(WriteCallbackData& data, bool isRetry) const - { - { - CassandraStatement statement{insertObject_}; - statement.bindBytes(data.key); - statement.bindInt(data.sequence); - statement.bindBytes(data.blob); - - executeAsyncWrite(statement, flatMapWriteCallback, data, isRetry); - } - } - void doWriteLedgerObject( std::string&& key, @@ -1252,112 +843,18 @@ public: std::string&& blob, bool isCreated, bool isDeleted, - std::optional&& book) const override - { - BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra"; - bool hasBook = book.has_value(); - WriteCallbackData* data = new WriteCallbackData( - this, - std::move(key), - seq, - std::move(blob), - isCreated, - isDeleted, - std::move(book)); - - write(*data, false); - } + std::optional&& book) const override; void writeAccountTransactions( - std::vector&& data) const override - { - for (auto& record : data) - { - for (auto& account : record.accounts) - { - WriteAccountTxCallbackData* cbData = - new WriteAccountTxCallbackData( - this, - std::move(account), - record.ledgerSequence, - record.transactionIndex, - std::move(record.txHash)); - writeAccountTx(*cbData, false); - } - } - } + std::vector&& data) const override; - void - writeAccountTx(WriteAccountTxCallbackData& data, bool isRetry) const - { - CassandraStatement statement(insertAccountTx_); - statement.bindBytes(data.account); - statement.bindIntTuple(data.ledgerSequence, data.transactionIndex); - statement.bindBytes(data.txHash); - - executeAsyncWrite( - statement, flatMapWriteAccountTxCallback, data, isRetry); - } - - struct WriteTransactionCallbackData - { - CassandraBackend const* backend; - // The shared pointer to the node object must exist until it's - // confirmed persisted. Otherwise, it can become deleted - // prematurely if other copies are removed from caches. - std::string hash; - uint32_t sequence; - std::string transaction; - std::string metadata; - - uint32_t currentRetries = 0; - - std::atomic refs = 1; - WriteTransactionCallbackData( - CassandraBackend const* f, - std::string&& hash, - uint32_t sequence, - std::string&& transaction, - std::string&& metadata) - : backend(f) - , hash(std::move(hash)) - , sequence(sequence) - , transaction(std::move(transaction)) - , metadata(std::move(metadata)) - { - } - }; - - void - writeTransaction(WriteTransactionCallbackData& data, bool isRetry) const - { - CassandraStatement statement{insertTransaction_}; - statement.bindBytes(data.hash); - statement.bindInt(data.sequence); - statement.bindBytes(data.transaction); - statement.bindBytes(data.metadata); - - executeAsyncWrite( - statement, flatMapWriteTransactionCallback, data, isRetry); - } void writeTransaction( std::string&& hash, uint32_t seq, std::string&& transaction, - std::string&& metadata) const override - { - BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra"; - WriteTransactionCallbackData* data = new WriteTransactionCallbackData( - this, - std::move(hash), - seq, - std::move(transaction), - std::move(metadata)); - - writeTransaction(*data, false); - } + std::string&& metadata) const override; void startWrites() const override @@ -1380,28 +877,6 @@ public: return ioContext_; } - friend void - flatMapWriteCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteKeyCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteTransactionCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteBookCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData); - - friend void - flatMapReadCallback(CassFuture* fut, void* cbData); - friend void - flatMapReadObjectCallback(CassFuture* fut, void* cbData); - friend void - flatMapGetCreatedCallback(CassFuture* fut, void* cbData); - inline void incremementOutstandingRequestCount() const { diff --git a/src/backend/PostgresBackend.cpp b/src/backend/PostgresBackend.cpp index f4ba7c28..1e1f206f 100644 --- a/src/backend/PostgresBackend.cpp +++ b/src/backend/PostgresBackend.cpp @@ -214,7 +214,7 @@ PostgresBackend::fetchLedgerByHash(ripple::uint256 const& hash) const } std::optional -PostgresBackend::fetchLedgerRange() const +PostgresBackend::hardFetchLedgerRange() const { auto range = PgQuery(pgPool_)("SELECT complete_ledgers()"); if (!range) @@ -729,7 +729,7 @@ PostgresBackend::writeKeys( bool PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const { - auto rng = fetchLedgerRangeNoThrow(); + auto rng = fetchLedgerRange(); if (!rng) return false; uint32_t minLedger = rng->maxSequence - numLedgersToKeep; diff --git a/src/backend/PostgresBackend.h b/src/backend/PostgresBackend.h index 01269cfc..28e32a6f 100644 --- a/src/backend/PostgresBackend.h +++ b/src/backend/PostgresBackend.h @@ -30,9 +30,6 @@ public: std::optional fetchLedgerByHash(ripple::uint256 const& hash) const override; - std::optional - fetchLedgerRange() const override; - std::optional fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const override; @@ -47,6 +44,9 @@ public: std::vector fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const override; + std::optional + hardFetchLedgerRange() const override; + LedgerPage doFetchLedgerPage( std::optional const& cursor, diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 50975957..8626525f 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -26,12 +26,11 @@ #include #include #include -#include #include #include -#include #include #include +#include namespace detail { /// Convenience function for printing out basic ledger info @@ -90,7 +89,7 @@ std::optional ReportingETL::loadInitialLedger(uint32_t startingSequence) { // check that database is actually empty - auto rng = backend_->fetchLedgerRangeNoThrow(); + auto rng = backend_->hardFetchLedgerRangeNoThrow(); if (rng) { BOOST_LOG_TRIVIAL(fatal) << __func__ << " : " @@ -115,10 +114,13 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) << "Deserialized ledger header. " << detail::toString(lgrInfo); backend_->startWrites(); + BOOST_LOG_TRIVIAL(debug) << __func__ << " started writes"; backend_->writeLedger( lgrInfo, std::move(*ledgerData->mutable_ledger_header()), true); + BOOST_LOG_TRIVIAL(debug) << __func__ << " wrote ledger"; std::vector accountTxData = insertTransactions(lgrInfo, *ledgerData); + BOOST_LOG_TRIVIAL(debug) << __func__ << " inserted txns"; auto start = std::chrono::system_clock::now(); @@ -127,6 +129,7 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) // consumes from the queue and inserts the data into the Ledger object. // Once the below call returns, all data has been pushed into the queue loadBalancer_->loadInitialLedger(startingSequence); + BOOST_LOG_TRIVIAL(debug) << __func__ << " loaded initial ledger"; if (!stopping_) { @@ -141,12 +144,13 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) void ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) -{ - auto ledgerRange = backend_->fetchLedgerRangeNoThrow(); +{ + backend_->updateRange(lgrInfo.seq); + auto ledgerRange = backend_->fetchLedgerRange(); std::optional fees; - std::vector transactions; - for(;;) + std::vector transactions; + for (;;) { try { @@ -189,7 +193,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) { try { - auto range = backend_->fetchLedgerRangeNoThrow(); + auto range = backend_->hardFetchLedgerRangeNoThrow(); if (!range || range->maxSequence < ledgerSequence) { @@ -395,7 +399,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) << "Starting etl pipeline"; writing_ = true; - auto rng = backend_->fetchLedgerRangeNoThrow(); + auto rng = backend_->hardFetchLedgerRangeNoThrow(); if (!rng || rng->maxSequence != startSequence - 1) { assert(false); @@ -497,6 +501,31 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) beast::setCurrentThreadName("rippled: ReportingETL transform"); uint32_t currentSequence = startSequence; + int counter = 0; + std::atomic_int per = 100; + auto startTimer = [this, &per]() { + auto innerFunc = [this, &per](auto& f) -> void { + std::shared_ptr timer = + std::make_shared( + ioContext_, + std::chrono::steady_clock::now() + + std::chrono::minutes(5)); + timer->async_wait( + [timer, f, &per](const boost::system::error_code& error) { + ++per; + BOOST_LOG_TRIVIAL(info) + << "Incremented per to " << std::to_string(per); + if (per > 100) + per = 100; + f(f); + }); + }; + innerFunc(innerFunc); + }; + // startTimer(); + + auto begin = std::chrono::system_clock::now(); + while (!writeConflict) { std::optional fetchResponse{ @@ -548,11 +577,24 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) BOOST_LOG_TRIVIAL(info) << "Running online delete"; backend_->doOnlineDelete(*onlineDeleteInterval_); BOOST_LOG_TRIVIAL(info) << "Finished online delete"; - auto rng = backend_->fetchLedgerRangeNoThrow(); + auto rng = backend_->fetchLedgerRange(); minSequence = rng->minSequence; deleting_ = false; }); } + /* + if (++counter >= per) + { + std::chrono::milliseconds sleep = + std::chrono::duration_cast( + std::chrono::seconds(4) - (end - begin)); + BOOST_LOG_TRIVIAL(info) << "Sleeping for " << sleep.count() + << " . per = " << std::to_string(per); + std::this_thread::sleep_for(sleep); + counter = 0; + begin = std::chrono::system_clock::now(); + } + */ } }}; diff --git a/src/handlers/methods/impl/AccountTx.cpp b/src/handlers/methods/impl/AccountTx.cpp index 2b61dc2e..62aea0a0 100644 --- a/src/handlers/methods/impl/AccountTx.cpp +++ b/src/handlers/methods/impl/AccountTx.cpp @@ -17,13 +17,12 @@ */ //============================================================================== -#include -#include #include #include +#include +#include -namespace RPC -{ +namespace RPC { Result doAccountTx(Context const& context) @@ -31,24 +30,24 @@ doAccountTx(Context const& context) auto request = context.params; boost::json::object response = {}; - if(!request.contains("account")) + if (!request.contains("account")) return Status{Error::rpcINVALID_PARAMS, "missingAccount"}; - if(!request.at("account").is_string()) + if (!request.at("account").is_string()) return Status{Error::rpcINVALID_PARAMS, "accountNotString"}; - - auto accountID = + + auto accountID = accountFromStringStrict(request.at("account").as_string().c_str()); if (!accountID) return Status{Error::rpcINVALID_PARAMS, "malformedAccount"}; bool binary = false; - if(request.contains("binary")) + if (request.contains("binary")) { - if(!request.at("binary").is_bool()) + if (!request.at("binary").is_bool()) return Status{Error::rpcINVALID_PARAMS, "binaryFlagNotBool"}; - + binary = request.at("binary").as_bool(); } @@ -64,15 +63,16 @@ doAccountTx(Context const& context) std::optional cursor; cursor = {context.range.maxSequence, 0}; - if (request.contains("cursor")) + if (request.contains("marker")) { - auto const& obj = request.at("cursor").as_object(); + auto const& obj = request.at("marker").as_object(); std::optional transactionIndex = {}; if (obj.contains("seq")) { if (!obj.at("seq").is_int64()) - return Status{Error::rpcINVALID_PARAMS, "transactionIndexNotInt"}; + return Status{ + Error::rpcINVALID_PARAMS, "transactionIndexNotInt"}; transactionIndex = value_to(obj.at("seq")); } @@ -81,9 +81,9 @@ doAccountTx(Context const& context) if (obj.contains("ledger")) { if (!obj.at("ledger").is_int64()) - return Status{Error::rpcINVALID_PARAMS, "transactionIndexNotInt"}; + return Status{Error::rpcINVALID_PARAMS, "ledgerIndexNotInt"}; - transactionIndex = value_to(obj.at("ledger")); + ledgerIndex = value_to(obj.at("ledger")); } if (!transactionIndex || !ledgerIndex) @@ -107,7 +107,7 @@ doAccountTx(Context const& context) std::uint32_t limit = 200; if (request.contains("limit")) { - if(!request.at("limit").is_int64()) + if (!request.at("limit").is_int64()) return Status{Error::rpcINVALID_PARAMS, "limitNotInt"}; limit = request.at("limit").as_int64(); @@ -117,14 +117,15 @@ doAccountTx(Context const& context) response["limit"] = limit; } - boost::json::array txns; auto start = std::chrono::system_clock::now(); auto [blobs, retCursor] = context.backend->fetchAccountTransactions(*accountID, limit, cursor); auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) << __func__ << " db fetch took " << ((end - start).count() / 1000000000.0) << " num blobs = " << blobs.size(); + BOOST_LOG_TRIVIAL(info) << __func__ << " db fetch took " + << ((end - start).count() / 1000000000.0) + << " num blobs = " << blobs.size(); response["account"] = ripple::to_string(*accountID); response["ledger_index_min"] = minIndex; @@ -132,6 +133,7 @@ doAccountTx(Context const& context) if (retCursor) { + BOOST_LOG_TRIVIAL(debug) << "setting json cursor"; boost::json::object cursorJson; cursorJson["ledger"] = retCursor->ledgerSequence; cursorJson["seq"] = retCursor->transactionIndex; @@ -157,7 +159,6 @@ doAccountTx(Context const& context) obj["tx"] = toJson(*txn); obj["tx"].as_object()["ledger_index"] = txnPlusMeta.ledgerSequence; obj["tx"].as_object()["inLedger"] = txnPlusMeta.ledgerSequence; - } else { @@ -174,9 +175,10 @@ doAccountTx(Context const& context) response["transactions"] = txns; auto end2 = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) << __func__ << " serialization took " << ((end2 - end).count() / 1000000000.0); - + BOOST_LOG_TRIVIAL(info) << __func__ << " serialization took " + << ((end2 - end).count() / 1000000000.0); + return response; } -} // namespace RPC \ No newline at end of file +} // namespace RPC diff --git a/src/webserver/HttpBase.h b/src/webserver/HttpBase.h index ae861325..c6619fc7 100644 --- a/src/webserver/HttpBase.h +++ b/src/webserver/HttpBase.h @@ -37,8 +37,8 @@ #include #include -#include #include +#include namespace http = boost::beast::http; namespace net = boost::asio; @@ -93,9 +93,9 @@ handle_request( std::string const& ip) { auto const httpResponse = [&req]( - http::status status, - std::string content_type, - std::string message) { + http::status status, + std::string content_type, + std::string message) { http::response res{status, req.version()}; res.set(http::field::server, "xrpl-reporting-server-v0.0.0"); res.set(http::field::content_type, content_type); @@ -119,8 +119,7 @@ handle_request( return send(httpResponse( http::status::ok, "application/json", - boost::json::serialize( - RPC::make_error(RPC::Error::rpcSLOW_DOWN)))); + boost::json::serialize(RPC::make_error(RPC::Error::rpcSLOW_DOWN)))); try { @@ -155,14 +154,9 @@ handle_request( "application/json", boost::json::serialize( RPC::make_error(RPC::Error::rpcNOT_READY)))); - - std::optional context = RPC::make_HttpContext( - request, - backend, - nullptr, - balancer, - *range - ); + + std::optional context = + RPC::make_HttpContext(request, backend, nullptr, balancer, *range); if (!context) return send(httpResponse( @@ -170,7 +164,7 @@ handle_request( "application/json", boost::json::serialize( RPC::make_error(RPC::Error::rpcBAD_SYNTAX)))); - + boost::json::object response{{"result", boost::json::object{}}}; boost::json::object& result = response["result"].as_object(); @@ -198,10 +192,8 @@ handle_request( if (!dosGuard.add(ip, responseStr.size())) result["warning"] = "Too many requests"; - return send(httpResponse( - http::status::ok, - "application/json", - responseStr)); + return send( + httpResponse(http::status::ok, "application/json", responseStr)); } catch (std::exception const& e) { diff --git a/test.py b/test.py index 9b821a46..5f8e6da8 100755 --- a/test.py +++ b/test.py @@ -219,7 +219,7 @@ async def account_tx(ip, port, account, binary, minLedger=None, maxLedger=None): await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"limit":200,"ledger_index_min":minLedger, "ledger_index_max":maxLedger})) res = json.loads(await ws.recv()) - #print(json.dumps(res,indent=4,sort_keys=True)) + print(json.dumps(res,indent=4,sort_keys=True)) return res except websockets.exceptions.ConnectionClosedError as e: print(e) @@ -322,6 +322,8 @@ async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=No req["ledger_index_min"] = minLedger req["ledger_index_max"] = maxLedger start = datetime.datetime.now().timestamp() + print("sending") + print(req) await ws.send(json.dumps(req)) res = await ws.recv() @@ -329,7 +331,8 @@ async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=No print(end - start) res = json.loads(res) - #print(json.dumps(res,indent=4,sort_keys=True)) + #print(res) + print(json.dumps(res,indent=4,sort_keys=True)) if "result" in res: print(len(res["result"]["transactions"])) else: @@ -341,15 +344,15 @@ async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=No if "cursor" in res: cursor = {"ledger_sequence":res["cursor"]["ledger_sequence"],"transaction_index":res["cursor"]["transaction_index"]} print(cursor) + if "marker" in res: + marker = {"ledger":res["marker"]["ledger"],"seq":res["marker"]["seq"]} + print(marker) elif "result" in res and "marker" in res["result"]: marker={"ledger":res["result"]["marker"]["ledger"],"seq":res["result"]["marker"]["seq"]} print(marker) else: - print(res) + print("no cursor or marker") break - if numCalls > numPages: - print("breaking") - break return results except websockets.exceptions.ConnectionClosedError as e: print(e) @@ -665,7 +668,8 @@ def getHashes(res): res = res["result"]["ledger"] hashes = [] - for x in res["transactions"]: + for x in res["ledger"]["transactions"]: + print(x) if "hash" in x: hashes.append(x["hash"]) elif "transaction" in x and "hash" in x["transaction"]: @@ -699,11 +703,14 @@ async def ledgers(ip, port, minLedger, maxLedger, transactions, expand, maxCalls start = datetime.datetime.now().timestamp() await ws.send(json.dumps({"command":"ledger","ledger_index":int(ledger),"binary":True, "transactions":bool(transactions),"expand":bool(expand)})) res = json.loads(await ws.recv()) - print(res["header"]["blob"]) end = datetime.datetime.now().timestamp() if (end - start) > 0.1: - print("request took more than 100ms") + print("request took more than 100ms : " + str(end - start)) numCalls = numCalls + 1 + if "error" in res: + print(res["error"]) + else: + print(res["header"]["blob"]) except websockets.exceptions.ConnectionClosedError as e: print(e) @@ -842,8 +849,13 @@ args = parser.parse_args() def run(args): asyncio.set_event_loop(asyncio.new_event_loop()) - if(args.ledger is None): - args.ledger = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port))[1] + rng =asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port)) + if args.ledger is None: + args.ledger = rng[1] + if args.maxLedger == -1: + args.maxLedger = rng[1] + if args.minLedger == -1: + args.minLedger = rng[0] if args.action == "fee": asyncio.get_event_loop().run_until_complete(fee(args.ip, args.port)) elif args.action == "server_info": @@ -891,6 +903,7 @@ def run(args): end = datetime.datetime.now().timestamp() num = int(args.numRunners) * int(args.numCalls) print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second") + print("Latency = " + str((end - start) / int(args.numCalls)) + " seconds") elif args.action == "ledger_entries": keys = [] ledger_index = 0 @@ -1012,7 +1025,8 @@ def run(args): args.hash = getHashes(asyncio.get_event_loop().run_until_complete(ledger(args.ip,args.port,args.ledger,False,True,False)))[0] res = asyncio.get_event_loop().run_until_complete(tx(args.ip,args.port,args.hash,False)) - args.account = res["transaction"]["Account"] + print(res) + args.account = res["Account"] res = asyncio.get_event_loop().run_until_complete( account_tx(args.ip, args.port, args.account, args.binary)) @@ -1031,16 +1045,16 @@ def run(args): args.hash = getHashes(asyncio.get_event_loop().run_until_complete(ledger(args.ip,args.port,args.ledger,False,True,False)))[0] res = asyncio.get_event_loop().run_until_complete(tx(args.ip,args.port,args.hash,False)) - args.account = res["transaction"]["Account"] + args.account = res["Account"] print("starting") res = asyncio.get_event_loop().run_until_complete( - account_tx_full(args.ip, args.port, args.account, args.binary,None,None,int(args.numPages))) + account_tx_full(args.ip, args.port, args.account, args.binary,None,None)) rng = getMinAndMax(res) print(len(res["transactions"])) print(args.account) txs = set() for x in res["transactions"]: - txs.add((x["transaction"],x["ledger_sequence"])) + txs.add((x["tx_blob"],x["ledger_index"])) print(len(txs)) if args.verify: