two ledger data implementations

This commit is contained in:
CJ Cobb
2021-02-16 13:09:38 -05:00
parent 6bda1af3e6
commit 3ce651ef52
3 changed files with 153 additions and 5 deletions

View File

@@ -50,11 +50,19 @@ doLedgerData(
request.contains("binary") ? request.at("binary").as_bool() : false; request.contains("binary") ? request.at("binary").as_bool() : false;
size_t limit = request.contains("limit") ? request.at("limit").as_int64() size_t limit = request.contains("limit") ? request.at("limit").as_int64()
: (binary ? 2048 : 256); : (binary ? 2048 : 256);
std::pair<
std::vector<CassandraFlatMapBackend::LedgerObject>,
std::optional<int64_t>>
resultsPair;
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
auto [results, returnedMarker] = if (request.contains("version"))
backend.doUpperBound(marker, ledger, limit); {
BOOST_LOG_TRIVIAL(debug) resultsPair = backend.doUpperBound2(marker, ledger, limit);
<< "doUpperBound returned " << results.size() << " results"; }
else
{
resultsPair = backend.doUpperBound(marker, ledger, limit);
}
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
@@ -62,6 +70,11 @@ doLedgerData(
std::chrono::duration_cast<std::chrono::microseconds>(end - start) std::chrono::duration_cast<std::chrono::microseconds>(end - start)
.count(); .count();
boost::json::array objects; boost::json::array objects;
std::vector<CassandraFlatMapBackend::LedgerObject>& results =
resultsPair.first;
std::optional<int64_t>& returnedMarker = resultsPair.second;
BOOST_LOG_TRIVIAL(debug)
<< "doUpperBound returned " << results.size() << " results";
for (auto const& [key, object] : results) for (auto const& [key, object] : results)
{ {
ripple::STLedgerEntry sle{ ripple::STLedgerEntry sle{
@@ -82,7 +95,7 @@ doLedgerData(
response["num_results"] = results.size(); response["num_results"] = results.size();
response["db_time"] = time; response["db_time"] = time;
response["time_per_result"] = time / results.size();
return response; return response;
} }

View File

@@ -1266,6 +1266,38 @@ CassandraFlatMapBackend::open()
// Get the prepared object from the future // Get the prepared object from the future
upperBound_ = cass_future_get_prepared(prepare_future); upperBound_ = cass_future_get_prepared(prepare_future);
// The future can be freed immediately after getting the prepared
// object
//
cass_future_free(prepare_future);
query = {};
query << "SELECT filterempty(key,object) FROM " << tableName << "flat "
<< " WHERE TOKEN(key) >= ? and sequence <= ?"
<< " PER PARTITION LIMIT 1 LIMIT ?"
<< " ALLOW FILTERING";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// Wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// Handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing upperBound : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str() << " : " << query.str();
continue;
}
// Get the prepared object from the future
upperBound2_ = cass_future_get_prepared(prepare_future);
// The future can be freed immediately after getting the prepared // The future can be freed immediately after getting the prepared
// object // object
// //

View File

@@ -96,6 +96,7 @@ private:
const CassPrepared* selectTransaction_ = nullptr; const CassPrepared* selectTransaction_ = nullptr;
const CassPrepared* selectObject_ = nullptr; const CassPrepared* selectObject_ = nullptr;
const CassPrepared* upperBound_ = nullptr; const CassPrepared* upperBound_ = nullptr;
const CassPrepared* upperBound2_ = nullptr;
const CassPrepared* getToken_ = nullptr; const CassPrepared* getToken_ = nullptr;
const CassPrepared* insertKey_ = nullptr; const CassPrepared* insertKey_ = nullptr;
const CassPrepared* getCreated_ = nullptr; const CassPrepared* getCreated_ = nullptr;
@@ -916,6 +917,108 @@ public:
std::vector<unsigned char> blob; std::vector<unsigned char> blob;
}; };
std::pair<std::vector<LedgerObject>, std::optional<int64_t>> std::pair<std::vector<LedgerObject>, std::optional<int64_t>>
doUpperBound2(
std::optional<int64_t> marker,
std::uint32_t seq,
std::uint32_t limit) const
{
BOOST_LOG_TRIVIAL(debug) << "Starting doUpperBound2";
CassStatement* statement = cass_prepared_bind(upperBound2_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
int64_t markerVal = marker ? marker.value() : INT64_MIN;
CassError rc = cass_statement_bind_int64(statement, 0, markerVal);
if (rc != CASS_OK)
{
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error)
<< "Binding Cassandra hash to doUpperBound query: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
rc = cass_statement_bind_int64(statement, 1, seq);
if (rc != CASS_OK)
{
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error)
<< "Binding Cassandra seq to doUpperBound query: " << rc << ", "
<< cass_error_desc(rc);
return {};
}
rc = cass_statement_bind_int32(statement, 2, limit);
if (rc != CASS_OK)
{
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error)
<< "Binding Cassandra limit to doUpperBound query: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
CassFuture* fut;
do
{
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Cassandra fetch error";
ss << ", retrying";
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
}
} while (rc != CASS_OK);
CassResult const* res = cass_future_get_result(fut);
cass_statement_free(statement);
cass_future_free(fut);
BOOST_LOG_TRIVIAL(debug) << "doUpperBound - got keys";
std::vector<LedgerObject> results;
CassIterator* iter = cass_iterator_from_result(res);
while (cass_iterator_next(iter))
{
CassRow const* row = cass_iterator_get_row(iter);
{
CassValue const* tup = cass_row_get_column(row, 0);
CassIterator* tupleIter = cass_iterator_from_tuple(tup);
if (!tupleIter)
continue;
cass_iterator_next(tupleIter);
CassValue const* keyVal = cass_iterator_get_value(tupleIter);
cass_iterator_next(tupleIter);
CassValue const* objectVal = cass_iterator_get_value(tupleIter);
cass_byte_t const* outData;
std::size_t outSize;
cass_value_get_bytes(keyVal, &outData, &outSize);
LedgerObject result;
result.key = ripple::uint256::fromVoid(outData);
cass_value_get_bytes(objectVal, &outData, &outSize);
std::vector<unsigned char> blob{outData, outData + outSize};
result.blob = std::move(blob);
results.push_back(std::move(result));
cass_iterator_free(tupleIter);
}
}
cass_iterator_free(iter);
cass_result_free(res);
BOOST_LOG_TRIVIAL(debug)
<< "doUpperBound2 - populated results. num results = "
<< results.size();
if (results.size())
{
auto token = getToken(results[results.size() - 1].key.data());
assert(token);
return std::make_pair(results, token);
}
return {{}, {}};
}
std::pair<std::vector<LedgerObject>, std::optional<int64_t>>
doUpperBound( doUpperBound(
std::optional<int64_t> marker, std::optional<int64_t> marker,
std::uint32_t seq, std::uint32_t seq,