From 3ce651ef5280b677884e8a22a319491ab7804778 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Tue, 16 Feb 2021 13:09:38 -0500 Subject: [PATCH] two ledger data implementations --- handlers/LedgerData.cpp | 23 ++++++-- reporting/ReportingBackend.cpp | 32 ++++++++++ reporting/ReportingBackend.h | 103 +++++++++++++++++++++++++++++++++ 3 files changed, 153 insertions(+), 5 deletions(-) diff --git a/handlers/LedgerData.cpp b/handlers/LedgerData.cpp index 5df827cc..017077ab 100644 --- a/handlers/LedgerData.cpp +++ b/handlers/LedgerData.cpp @@ -50,11 +50,19 @@ doLedgerData( request.contains("binary") ? request.at("binary").as_bool() : false; size_t limit = request.contains("limit") ? request.at("limit").as_int64() : (binary ? 2048 : 256); + std::pair< + std::vector, + std::optional> + resultsPair; auto start = std::chrono::system_clock::now(); - auto [results, returnedMarker] = - backend.doUpperBound(marker, ledger, limit); - BOOST_LOG_TRIVIAL(debug) - << "doUpperBound returned " << results.size() << " results"; + if (request.contains("version")) + { + resultsPair = backend.doUpperBound2(marker, ledger, limit); + } + else + { + resultsPair = backend.doUpperBound(marker, ledger, limit); + } auto end = std::chrono::system_clock::now(); @@ -62,6 +70,11 @@ doLedgerData( std::chrono::duration_cast(end - start) .count(); boost::json::array objects; + std::vector& results = + resultsPair.first; + std::optional& returnedMarker = resultsPair.second; + BOOST_LOG_TRIVIAL(debug) + << "doUpperBound returned " << results.size() << " results"; for (auto const& [key, object] : results) { ripple::STLedgerEntry sle{ @@ -82,7 +95,7 @@ doLedgerData( response["num_results"] = results.size(); response["db_time"] = time; - + response["time_per_result"] = time / results.size(); return response; } diff --git a/reporting/ReportingBackend.cpp b/reporting/ReportingBackend.cpp index 9a9986cb..11090053 100644 --- a/reporting/ReportingBackend.cpp +++ b/reporting/ReportingBackend.cpp @@ -1266,6 +1266,38 @@ CassandraFlatMapBackend::open() // Get the prepared object from the future upperBound_ = cass_future_get_prepared(prepare_future); + // The future can be freed immediately after getting the prepared + // object + // + cass_future_free(prepare_future); + + query = {}; + query << "SELECT filterempty(key,object) FROM " << tableName << "flat " + << " WHERE TOKEN(key) >= ? and sequence <= ?" + << " PER PARTITION LIMIT 1 LIMIT ?" + << " ALLOW FILTERING"; + + prepare_future = + cass_session_prepare(session_.get(), query.str().c_str()); + + // Wait for the statement to prepare and get the result + rc = cass_future_error_code(prepare_future); + + if (rc != CASS_OK) + { + // Handle error + cass_future_free(prepare_future); + + std::stringstream ss; + ss << "nodestore: Error preparing upperBound : " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << ss.str() << " : " << query.str(); + continue; + } + + // Get the prepared object from the future + upperBound2_ = cass_future_get_prepared(prepare_future); + // The future can be freed immediately after getting the prepared // object // diff --git a/reporting/ReportingBackend.h b/reporting/ReportingBackend.h index f32529a9..942e8e9a 100644 --- a/reporting/ReportingBackend.h +++ b/reporting/ReportingBackend.h @@ -96,6 +96,7 @@ private: const CassPrepared* selectTransaction_ = nullptr; const CassPrepared* selectObject_ = nullptr; const CassPrepared* upperBound_ = nullptr; + const CassPrepared* upperBound2_ = nullptr; const CassPrepared* getToken_ = nullptr; const CassPrepared* insertKey_ = nullptr; const CassPrepared* getCreated_ = nullptr; @@ -916,6 +917,108 @@ public: std::vector blob; }; std::pair, std::optional> + doUpperBound2( + std::optional marker, + std::uint32_t seq, + std::uint32_t limit) const + { + BOOST_LOG_TRIVIAL(debug) << "Starting doUpperBound2"; + CassStatement* statement = cass_prepared_bind(upperBound2_); + cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + int64_t markerVal = marker ? marker.value() : INT64_MIN; + + CassError rc = cass_statement_bind_int64(statement, 0, markerVal); + if (rc != CASS_OK) + { + cass_statement_free(statement); + BOOST_LOG_TRIVIAL(error) + << "Binding Cassandra hash to doUpperBound query: " << rc + << ", " << cass_error_desc(rc); + return {}; + } + + rc = cass_statement_bind_int64(statement, 1, seq); + if (rc != CASS_OK) + { + cass_statement_free(statement); + BOOST_LOG_TRIVIAL(error) + << "Binding Cassandra seq to doUpperBound query: " << rc << ", " + << cass_error_desc(rc); + return {}; + } + + rc = cass_statement_bind_int32(statement, 2, limit); + if (rc != CASS_OK) + { + cass_statement_free(statement); + BOOST_LOG_TRIVIAL(error) + << "Binding Cassandra limit to doUpperBound query: " << rc + << ", " << cass_error_desc(rc); + return {}; + } + + CassFuture* fut; + do + { + fut = cass_session_execute(session_.get(), statement); + rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Cassandra fetch error"; + ss << ", retrying"; + ss << ": " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(warning) << ss.str(); + } + } while (rc != CASS_OK); + + CassResult const* res = cass_future_get_result(fut); + cass_statement_free(statement); + cass_future_free(fut); + + BOOST_LOG_TRIVIAL(debug) << "doUpperBound - got keys"; + std::vector results; + + CassIterator* iter = cass_iterator_from_result(res); + while (cass_iterator_next(iter)) + { + CassRow const* row = cass_iterator_get_row(iter); + + { + CassValue const* tup = cass_row_get_column(row, 0); + CassIterator* tupleIter = cass_iterator_from_tuple(tup); + if (!tupleIter) + continue; + cass_iterator_next(tupleIter); + CassValue const* keyVal = cass_iterator_get_value(tupleIter); + cass_iterator_next(tupleIter); + CassValue const* objectVal = cass_iterator_get_value(tupleIter); + cass_byte_t const* outData; + std::size_t outSize; + cass_value_get_bytes(keyVal, &outData, &outSize); + LedgerObject result; + result.key = ripple::uint256::fromVoid(outData); + cass_value_get_bytes(objectVal, &outData, &outSize); + std::vector blob{outData, outData + outSize}; + result.blob = std::move(blob); + results.push_back(std::move(result)); + cass_iterator_free(tupleIter); + } + } + cass_iterator_free(iter); + cass_result_free(res); + BOOST_LOG_TRIVIAL(debug) + << "doUpperBound2 - populated results. num results = " + << results.size(); + if (results.size()) + { + auto token = getToken(results[results.size() - 1].key.data()); + assert(token); + return std::make_pair(results, token); + } + return {{}, {}}; + } + std::pair, std::optional> doUpperBound( std::optional marker, std::uint32_t seq,