created backend interface. Cassandra derives from it

This commit is contained in:
CJ Cobb
2021-02-23 14:28:38 -05:00
parent 3ce651ef52
commit 5534d9c75e
12 changed files with 132 additions and 223 deletions

View File

@@ -93,7 +93,7 @@ doAccountInfo(
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
std::optional<std::vector<unsigned char>> dbResponse = std::optional<std::vector<unsigned char>> dbResponse =
backend.fetch(key.key.data(), ledgerSequence); backend.fetchLedgerObject(key.key, ledgerSequence);
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
auto time = auto time =
std::chrono::duration_cast<std::chrono::microseconds>(end - start) std::chrono::duration_cast<std::chrono::microseconds>(end - start)

View File

@@ -105,10 +105,10 @@ doAccountTxStoredProcedure(
std::shared_ptr<ripple::STTx const>, std::shared_ptr<ripple::STTx const>,
std::shared_ptr<ripple::STObject const>>> std::shared_ptr<ripple::STObject const>>>
results; results;
auto dbResults = backend.fetchBatch(nodestoreHashes); auto dbResults = backend.fetchTransactions(nodestoreHashes);
for (auto const& res : dbResults) for (auto const& res : dbResults)
{ {
if (res.first.size() && res.second.size()) if (res.transaction.size() && res.metadata.size())
results.push_back(deserializeTxPlusMeta(res)); results.push_back(deserializeTxPlusMeta(res));
} }
return results; return results;

View File

@@ -90,7 +90,7 @@ loadBookOfferIndexes(
boost::json::object boost::json::object
doBookOffers( doBookOffers(
boost::json::object const& request, boost::json::object const& request,
CassandraFlatMapBackend const& backend, BackendInterface const& backend,
std::shared_ptr<PgPool>& pool) std::shared_ptr<PgPool>& pool)
{ {
std::cout << "enter" << std::endl; std::cout << "enter" << std::endl;
@@ -308,14 +308,14 @@ doBookOffers(
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
ripple::uint256 bookBase = getBookBase(book); ripple::uint256 bookBase = getBookBase(book);
std::vector<CassandraFlatMapBackend::LedgerObject> offers; std::vector<BackendInterface::LedgerObject> offers;
if (!cursor.isZero()) if (!cursor.isZero())
{ {
offers = backend.doBookOffers(bookBase, *sequence, cursor); offers = backend.fetchBookOffers(bookBase, *sequence, cursor);
} }
else else
{ {
offers = backend.doBookOffers(bookBase, *sequence); offers = backend.fetchBookOffers(bookBase, *sequence);
} }
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();

View File

@@ -50,19 +50,9 @@ doLedgerData(
request.contains("binary") ? request.at("binary").as_bool() : false; request.contains("binary") ? request.at("binary").as_bool() : false;
size_t limit = request.contains("limit") ? request.at("limit").as_int64() size_t limit = request.contains("limit") ? request.at("limit").as_int64()
: (binary ? 2048 : 256); : (binary ? 2048 : 256);
std::pair< BackendInterface::LedgerPage page;
std::vector<CassandraFlatMapBackend::LedgerObject>,
std::optional<int64_t>>
resultsPair;
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
if (request.contains("version")) page = backend.fetchLedgerPage(marker, ledger, limit);
{
resultsPair = backend.doUpperBound2(marker, ledger, limit);
}
else
{
resultsPair = backend.doUpperBound(marker, ledger, limit);
}
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
@@ -70,9 +60,8 @@ doLedgerData(
std::chrono::duration_cast<std::chrono::microseconds>(end - start) std::chrono::duration_cast<std::chrono::microseconds>(end - start)
.count(); .count();
boost::json::array objects; boost::json::array objects;
std::vector<CassandraFlatMapBackend::LedgerObject>& results = std::vector<BackendInterface::LedgerObject>& results = page.objects;
resultsPair.first; std::optional<int64_t>& returnedMarker = page.cursor;
std::optional<int64_t>& returnedMarker = resultsPair.second;
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< "doUpperBound returned " << results.size() << " results"; << "doUpperBound returned " << results.size() << " results";
for (auto const& [key, object] : results) for (auto const& [key, object] : results)

View File

@@ -1,4 +1,5 @@
#include <handlers/RPCHelpers.h> #include <handlers/RPCHelpers.h>
#include <reporting/BackendInterface.h>
std::optional<ripple::AccountID> std::optional<ripple::AccountID>
accountFromStringStrict(std::string const& account) accountFromStringStrict(std::string const& account)
@@ -21,20 +22,19 @@ accountFromStringStrict(std::string const& account)
std::pair< std::pair<
std::shared_ptr<ripple::STTx const>, std::shared_ptr<ripple::STTx const>,
std::shared_ptr<ripple::STObject const>> std::shared_ptr<ripple::STObject const>>
deserializeTxPlusMeta( deserializeTxPlusMeta(BackendInterface::TransactionAndMetadata const& blobs)
std::pair<std::vector<unsigned char>, std::vector<unsigned char>> const&
blobs)
{ {
std::pair< std::pair<
std::shared_ptr<ripple::STTx const>, std::shared_ptr<ripple::STTx const>,
std::shared_ptr<ripple::STObject const>> std::shared_ptr<ripple::STObject const>>
result; result;
{ {
ripple::SerialIter s{blobs.first.data(), blobs.first.size()}; ripple::SerialIter s{
blobs.transaction.data(), blobs.transaction.size()};
result.first = std::make_shared<ripple::STTx const>(s); result.first = std::make_shared<ripple::STTx const>(s);
} }
{ {
ripple::SerialIter s{blobs.second.data(), blobs.second.size()}; ripple::SerialIter s{blobs.metadata.data(), blobs.metadata.size()};
result.second = result.second =
std::make_shared<ripple::STObject const>(s, ripple::sfMetadata); std::make_shared<ripple::STObject const>(s, ripple::sfMetadata);
} }

View File

@@ -5,15 +5,14 @@
#include <ripple/protocol/STLedgerEntry.h> #include <ripple/protocol/STLedgerEntry.h>
#include <ripple/protocol/STTx.h> #include <ripple/protocol/STTx.h>
#include <boost/json.hpp> #include <boost/json.hpp>
#include <reporting/BackendInterface.h>
std::optional<ripple::AccountID> std::optional<ripple::AccountID>
accountFromStringStrict(std::string const& account); accountFromStringStrict(std::string const& account);
std::pair< std::pair<
std::shared_ptr<ripple::STTx const>, std::shared_ptr<ripple::STTx const>,
std::shared_ptr<ripple::STObject const>> std::shared_ptr<ripple::STObject const>>
deserializeTxPlusMeta( deserializeTxPlusMeta(BackendInterface::TransactionAndMetadata const& blobs);
std::pair<std::vector<unsigned char>, std::vector<unsigned char>> const&
blobs);
boost::json::object boost::json::object
getJson(ripple::STBase const& obj); getJson(ripple::STBase const& obj);

View File

@@ -51,7 +51,7 @@ doTx(
return response; return response;
} }
auto dbResponse = backend.fetchTransaction(hash.data()); auto dbResponse = backend.fetchTransaction(hash);
if (!dbResponse) if (!dbResponse)
{ {
response["error"] = "Transaction not found in Cassandra"; response["error"] = "Transaction not found in Cassandra";

View File

@@ -457,7 +457,7 @@ public:
book->data()[book->size() - 1 - i] = 0x00; book->data()[book->size() - 1 - i] = 0x00;
} }
} }
backend.store( backend.writeLedgerObject(
std::move(*obj.mutable_key()), std::move(*obj.mutable_key()),
request_.ledger().sequence(), request_.ledger().sequence(),
std::move(*obj.mutable_data()), std::move(*obj.mutable_data()),

View File

@@ -741,6 +741,13 @@ CREATE TABLE IF NOT EXISTS ledgers (
trans_set_hash bytea NOT NULL trans_set_hash bytea NOT NULL
); );
CREATE TABLE IF NOT EXISTS objects (
key bytea PRIMARY KEY,
ledger_seq bigint NOT NULL,
object bytea NOT NULL
);
-- Index for lookups by ledger hash. -- Index for lookups by ledger hash.
CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers
USING hash (ledger_hash); USING hash (ledger_hash);
@@ -748,7 +755,10 @@ CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers
-- Transactions table. Deletes from the ledger table -- Transactions table. Deletes from the ledger table
-- cascade here based on ledger_seq. -- cascade here based on ledger_seq.
CREATE TABLE IF NOT EXISTS transactions ( CREATE TABLE IF NOT EXISTS transactions (
ledger_seq bigint NOT NULL, hash bytea PRIMARY KEY,
ledger_seq bigint,
transaction bytea,
metadata bytea,
transaction_index bigint NOT NULL, transaction_index bigint NOT NULL,
trans_id bytea NOT NULL, trans_id bytea NOT NULL,
nodestore_hash bytea NOT NULL, nodestore_hash bytea NOT NULL,

View File

@@ -385,7 +385,7 @@ flatMapReadCallback(CassFuture* fut, void* cbData)
return; return;
} }
std::vector<unsigned char> meta{buf2, buf2 + buf2Size}; std::vector<unsigned char> meta{buf2, buf2 + buf2Size};
requestParams.result = std::make_pair(std::move(txn), std::move(meta)); requestParams.result = {std::move(txn), std::move(meta)};
cass_result_free(res); cass_result_free(res);
finish(); finish();
} }
@@ -1462,15 +1462,16 @@ CassandraFlatMapBackend::open()
cass_future_free(prepare_future); cass_future_free(prepare_future);
std::stringstream ss; std::stringstream ss;
ss << "nodestore: error preparing updateLedgerRange : " << rc << ", " ss << "nodestore: error preparing updateLedgerRange : " << rc
<< cass_error_desc(rc); << ", " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str(); BOOST_LOG_TRIVIAL(error) << ss.str();
continue; continue;
} }
updateLedgerRange_ = cass_future_get_prepared(prepare_future); updateLedgerRange_ = cass_future_get_prepared(prepare_future);
query = {}; query = {};
query << " select header from " << tableName << "ledgers where sequence = ?"; query << " select header from " << tableName
<< "ledgers where sequence = ?";
prepare_future = prepare_future =
cass_session_prepare(session_.get(), query.str().c_str()); cass_session_prepare(session_.get(), query.str().c_str());
@@ -1484,15 +1485,16 @@ CassandraFlatMapBackend::open()
cass_future_free(prepare_future); cass_future_free(prepare_future);
std::stringstream ss; std::stringstream ss;
ss << "nodestore: error preparing selectLedgerBySeq : " << rc << ", " ss << "nodestore: error preparing selectLedgerBySeq : " << rc
<< cass_error_desc(rc); << ", " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str(); BOOST_LOG_TRIVIAL(error) << ss.str();
continue; continue;
} }
selectLedgerBySeq_ = cass_future_get_prepared(prepare_future); selectLedgerBySeq_ = cass_future_get_prepared(prepare_future);
query = {}; query = {};
query << " select sequence from " << tableName << "ledger_range where is_latest = true"; query << " select sequence from " << tableName
<< "ledger_range where is_latest = true";
prepare_future = prepare_future =
cass_session_prepare(session_.get(), query.str().c_str()); cass_session_prepare(session_.get(), query.str().c_str());
@@ -1506,8 +1508,8 @@ CassandraFlatMapBackend::open()
cass_future_free(prepare_future); cass_future_free(prepare_future);
std::stringstream ss; std::stringstream ss;
ss << "nodestore: error preparing selectLatestLedger : " << rc << ", " ss << "nodestore: error preparing selectLatestLedger : " << rc
<< cass_error_desc(rc); << ", " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str(); BOOST_LOG_TRIVIAL(error) << ss.str();
continue; continue;
} }

View File

@@ -31,6 +31,7 @@
#include <iostream> #include <iostream>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <reporting/BackendInterface.h>
#include <reporting/DBHelpers.h> #include <reporting/DBHelpers.h>
void void
@@ -53,7 +54,8 @@ void
flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData); flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData);
void void
flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData); flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData);
class CassandraFlatMapBackend
class CassandraFlatMapBackend : BackendInterface
{ {
private: private:
// convenience function for one-off queries. For normal reads and writes, // convenience function for one-off queries. For normal reads and writes,
@@ -161,11 +163,11 @@ public:
// Create the table if it doesn't exist already // Create the table if it doesn't exist already
// @param createIfMissing ignored // @param createIfMissing ignored
void void
open(); open() override;
// Close the connection to the database // Close the connection to the database
void void
close() close() override
{ {
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
@@ -256,14 +258,13 @@ public:
open_ = false; open_ = false;
} }
using Blob = std::vector<unsigned char>;
using BlobPair = std::pair<Blob, Blob>;
std::pair< std::pair<
std::vector<BlobPair>, std::vector<BackendInterface::TransactionAndMetadata>,
std::optional<std::pair<uint32_t, uint32_t>>> std::optional<BackendInterface::AccountTransactionsCursor>>
doAccountTx( fetchAccountTransactions(
ripple::AccountID const& account, ripple::AccountID const& account,
std::optional<std::pair<uint32_t, uint32_t>> cursor = {}) std::optional<BackendInterface::AccountTransactionsCursor> const&
cursor) const override
{ {
BOOST_LOG_TRIVIAL(debug) << "Starting doAccountTx"; BOOST_LOG_TRIVIAL(debug) << "Starting doAccountTx";
CassStatement* statement = cass_prepared_bind(selectAccountTx_); CassStatement* statement = cass_prepared_bind(selectAccountTx_);
@@ -279,11 +280,17 @@ public:
<< cass_error_desc(rc); << cass_error_desc(rc);
return {}; return {};
} }
if (!cursor)
cursor = std::make_pair(INT32_MAX, INT32_MAX);
CassTuple* cassCursor = cass_tuple_new(2); CassTuple* cassCursor = cass_tuple_new(2);
cass_tuple_set_int64(cassCursor, 0, cursor->first); if (cursor)
cass_tuple_set_int64(cassCursor, 1, cursor->second); {
cass_tuple_set_int64(cassCursor, 0, cursor->ledgerSequence);
cass_tuple_set_int64(cassCursor, 1, cursor->transactionIndex);
}
else
{
cass_tuple_set_int64(cassCursor, 0, INT32_MAX);
cass_tuple_set_int64(cassCursor, 1, INT32_MAX);
}
rc = cass_statement_bind_tuple(statement, 1, cassCursor); rc = cass_statement_bind_tuple(statement, 1, cassCursor);
if (rc != CASS_OK) if (rc != CASS_OK)
{ {
@@ -319,6 +326,7 @@ public:
bool more = numRows == 300; bool more = numRows == 300;
CassIterator* iter = cass_iterator_from_result(res); CassIterator* iter = cass_iterator_from_result(res);
std::optional<AccountTransactionsCursor> retCursor;
while (cass_iterator_next(iter)) while (cass_iterator_next(iter))
{ {
CassRow const* row = cass_iterator_get_row(iter); CassRow const* row = cass_iterator_get_row(iter);
@@ -357,12 +365,7 @@ public:
int64_t idxOut; int64_t idxOut;
cass_value_get_int64(seqVal, &seqOut); cass_value_get_int64(seqVal, &seqOut);
cass_value_get_int64(idxVal, &idxOut); cass_value_get_int64(idxVal, &idxOut);
cursor->first = (uint32_t)seqOut; retCursor = {(uint32_t)seqOut, (uint32_t)idxOut};
cursor->second = (uint32_t)idxOut;
}
else
{
cursor = {};
} }
} }
} }
@@ -370,8 +373,7 @@ public:
<< "doAccountTx - populated hashes. num hashes = " << hashes.size(); << "doAccountTx - populated hashes. num hashes = " << hashes.size();
if (hashes.size()) if (hashes.size())
{ {
std::vector<BlobPair> results; return {fetchTransactions(hashes), retCursor};
return {fetchBatch(hashes), cursor};
} }
return {{}, {}}; return {{}, {}};
@@ -411,7 +413,7 @@ public:
writeLedger( writeLedger(
ripple::LedgerInfo const& ledgerInfo, ripple::LedgerInfo const& ledgerInfo,
std::string&& header, std::string&& header,
bool isFirst = false) const bool isFirst = false) const override
{ {
WriteLedgerHeaderCallbackData* headerCb = WriteLedgerHeaderCallbackData* headerCb =
new WriteLedgerHeaderCallbackData( new WriteLedgerHeaderCallbackData(
@@ -581,7 +583,7 @@ public:
} }
std::optional<uint32_t> std::optional<uint32_t>
getLatestLedgerSequence() fetchLatestLedgerSequence() const override
{ {
BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
@@ -635,7 +637,7 @@ public:
} }
std::optional<ripple::LedgerInfo> std::optional<ripple::LedgerInfo>
getLedgerBySequence(uint32_t sequence) fetchLedgerBySequence(uint32_t sequence) const override
{ {
BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
@@ -705,15 +707,16 @@ public:
// @param key the key of the object // @param key the key of the object
// @param pno object in which to store the result // @param pno object in which to store the result
// @return result status of query // @return result status of query
std::optional<std::vector<unsigned char>> std::optional<BackendInterface::Blob>
fetch(void const* key, uint32_t sequence) const fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence)
const override
{ {
BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
CassStatement* statement = cass_prepared_bind(selectObject_); CassStatement* statement = cass_prepared_bind(selectObject_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
CassError rc = cass_statement_bind_bytes( CassError rc = cass_statement_bind_bytes(
statement, 0, static_cast<cass_byte_t const*>(key), 32); statement, 0, static_cast<cass_byte_t const*>(key.data()), 32);
if (rc != CASS_OK) if (rc != CASS_OK)
{ {
cass_statement_free(statement); cass_statement_free(statement);
@@ -834,16 +837,15 @@ public:
return token + 1; return token + 1;
} }
std::optional< std::optional<BackendInterface::TransactionAndMetadata>
std::pair<std::vector<unsigned char>, std::vector<unsigned char>>> fetchTransaction(ripple::uint256 const& hash) const override
fetchTransaction(void const* hash) const
{ {
BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
CassStatement* statement = cass_prepared_bind(selectTransaction_); CassStatement* statement = cass_prepared_bind(selectTransaction_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
CassError rc = cass_statement_bind_bytes( CassError rc = cass_statement_bind_bytes(
statement, 0, static_cast<cass_byte_t const*>(hash), 32); statement, 0, static_cast<cass_byte_t const*>(hash.data()), 32);
if (rc != CASS_OK) if (rc != CASS_OK)
{ {
cass_statement_free(statement); cass_statement_free(statement);
@@ -911,125 +913,18 @@ public:
<< " microseconds"; << " microseconds";
return {{txResult, metaResult}}; return {{txResult, metaResult}};
} }
struct LedgerObject BackendInterface::LedgerPage
{ fetchLedgerPage(
ripple::uint256 key; std::optional<BackendInterface::LedgerCursor> const& cursor,
std::vector<unsigned char> blob; std::uint32_t ledgerSequence,
}; std::uint32_t limit) const override
std::pair<std::vector<LedgerObject>, std::optional<int64_t>>
doUpperBound2(
std::optional<int64_t> marker,
std::uint32_t seq,
std::uint32_t limit) const
{
BOOST_LOG_TRIVIAL(debug) << "Starting doUpperBound2";
CassStatement* statement = cass_prepared_bind(upperBound2_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
int64_t markerVal = marker ? marker.value() : INT64_MIN;
CassError rc = cass_statement_bind_int64(statement, 0, markerVal);
if (rc != CASS_OK)
{
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error)
<< "Binding Cassandra hash to doUpperBound query: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
rc = cass_statement_bind_int64(statement, 1, seq);
if (rc != CASS_OK)
{
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error)
<< "Binding Cassandra seq to doUpperBound query: " << rc << ", "
<< cass_error_desc(rc);
return {};
}
rc = cass_statement_bind_int32(statement, 2, limit);
if (rc != CASS_OK)
{
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error)
<< "Binding Cassandra limit to doUpperBound query: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
CassFuture* fut;
do
{
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Cassandra fetch error";
ss << ", retrying";
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
}
} while (rc != CASS_OK);
CassResult const* res = cass_future_get_result(fut);
cass_statement_free(statement);
cass_future_free(fut);
BOOST_LOG_TRIVIAL(debug) << "doUpperBound - got keys";
std::vector<LedgerObject> results;
CassIterator* iter = cass_iterator_from_result(res);
while (cass_iterator_next(iter))
{
CassRow const* row = cass_iterator_get_row(iter);
{
CassValue const* tup = cass_row_get_column(row, 0);
CassIterator* tupleIter = cass_iterator_from_tuple(tup);
if (!tupleIter)
continue;
cass_iterator_next(tupleIter);
CassValue const* keyVal = cass_iterator_get_value(tupleIter);
cass_iterator_next(tupleIter);
CassValue const* objectVal = cass_iterator_get_value(tupleIter);
cass_byte_t const* outData;
std::size_t outSize;
cass_value_get_bytes(keyVal, &outData, &outSize);
LedgerObject result;
result.key = ripple::uint256::fromVoid(outData);
cass_value_get_bytes(objectVal, &outData, &outSize);
std::vector<unsigned char> blob{outData, outData + outSize};
result.blob = std::move(blob);
results.push_back(std::move(result));
cass_iterator_free(tupleIter);
}
}
cass_iterator_free(iter);
cass_result_free(res);
BOOST_LOG_TRIVIAL(debug)
<< "doUpperBound2 - populated results. num results = "
<< results.size();
if (results.size())
{
auto token = getToken(results[results.size() - 1].key.data());
assert(token);
return std::make_pair(results, token);
}
return {{}, {}};
}
std::pair<std::vector<LedgerObject>, std::optional<int64_t>>
doUpperBound(
std::optional<int64_t> marker,
std::uint32_t seq,
std::uint32_t limit) const
{ {
BOOST_LOG_TRIVIAL(debug) << "Starting doUpperBound"; BOOST_LOG_TRIVIAL(debug) << "Starting doUpperBound";
CassStatement* statement = cass_prepared_bind(upperBound_); CassStatement* statement = cass_prepared_bind(upperBound_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
int64_t markerVal = marker ? marker.value() : INT64_MIN; int64_t cursorVal = cursor.has_value() ? cursor.value() : INT64_MIN;
CassError rc = cass_statement_bind_int64(statement, 0, markerVal); CassError rc = cass_statement_bind_int64(statement, 0, cursorVal);
if (rc != CASS_OK) if (rc != CASS_OK)
{ {
cass_statement_free(statement); cass_statement_free(statement);
@@ -1039,7 +934,7 @@ public:
return {}; return {};
} }
rc = cass_statement_bind_int64(statement, 1, seq); rc = cass_statement_bind_int64(statement, 1, ledgerSequence);
if (rc != CASS_OK) if (rc != CASS_OK)
{ {
cass_statement_free(statement); cass_statement_free(statement);
@@ -1048,7 +943,7 @@ public:
<< cass_error_desc(rc); << cass_error_desc(rc);
return {}; return {};
} }
rc = cass_statement_bind_int64(statement, 2, seq); rc = cass_statement_bind_int64(statement, 2, ledgerSequence);
if (rc != CASS_OK) if (rc != CASS_OK)
{ {
cass_statement_free(statement); cass_statement_free(statement);
@@ -1116,24 +1011,25 @@ public:
if (keys.size()) if (keys.size())
{ {
std::vector<LedgerObject> results; std::vector<LedgerObject> results;
std::vector<Blob> objs = fetchObjectsBatch(keys, seq); std::vector<BackendInterface::Blob> objs =
fetchLedgerObjects(keys, ledgerSequence);
for (size_t i = 0; i < objs.size(); ++i) for (size_t i = 0; i < objs.size(); ++i)
{ {
results.push_back({keys[i], objs[i]}); results.push_back({keys[i], objs[i]});
} }
auto token = getToken(results[results.size() - 1].key.data()); auto token = getToken(results[results.size() - 1].key.data());
assert(token); assert(token);
return std::make_pair(results, token); return {results, token};
} }
return {{}, {}}; return {{}, {}};
} }
std::vector<LedgerObject> std::vector<BackendInterface::LedgerObject>
doBookOffers( fetchBookOffers(
ripple::uint256 const& book, ripple::uint256 const& book,
uint32_t sequence, uint32_t sequence,
ripple::uint256 const& cursor = {}) const std::optional<ripple::uint256> const& cursor) const override
{ {
BOOST_LOG_TRIVIAL(debug) << "Starting doBookOffers"; BOOST_LOG_TRIVIAL(debug) << "Starting doBookOffers";
CassStatement* statement = cass_prepared_bind(getBook_); CassStatement* statement = cass_prepared_bind(getBook_);
@@ -1168,8 +1064,18 @@ public:
<< ", " << cass_error_desc(rc); << ", " << cass_error_desc(rc);
return {}; return {};
} }
if (cursor)
rc = cass_statement_bind_bytes( rc = cass_statement_bind_bytes(
statement, 3, static_cast<cass_byte_t const*>(cursor.data()), 32); statement,
3,
static_cast<cass_byte_t const*>(cursor->data()),
32);
else
{
ripple::uint256 zero = {};
rc = cass_statement_bind_bytes(
statement, 3, static_cast<cass_byte_t const*>(zero.data()), 32);
}
if (rc != CASS_OK) if (rc != CASS_OK)
{ {
@@ -1228,7 +1134,8 @@ public:
if (keys.size()) if (keys.size())
{ {
std::vector<LedgerObject> results; std::vector<LedgerObject> results;
std::vector<Blob> objs = fetchObjectsBatch(keys, sequence); std::vector<BackendInterface::Blob> objs =
fetchLedgerObjects(keys, sequence);
for (size_t i = 0; i < objs.size(); ++i) for (size_t i = 0; i < objs.size(); ++i)
{ {
results.push_back({keys[i], objs[i]}); results.push_back({keys[i], objs[i]});
@@ -1248,7 +1155,7 @@ public:
{ {
CassandraFlatMapBackend const& backend; CassandraFlatMapBackend const& backend;
ripple::uint256 const& hash; ripple::uint256 const& hash;
BlobPair& result; BackendInterface::TransactionAndMetadata& result;
std::condition_variable& cv; std::condition_variable& cv;
std::atomic_uint32_t& numFinished; std::atomic_uint32_t& numFinished;
@@ -1257,7 +1164,7 @@ public:
ReadCallbackData( ReadCallbackData(
CassandraFlatMapBackend const& backend, CassandraFlatMapBackend const& backend,
ripple::uint256 const& hash, ripple::uint256 const& hash,
BlobPair& result, BackendInterface::TransactionAndMetadata& result,
std::condition_variable& cv, std::condition_variable& cv,
std::atomic_uint32_t& numFinished, std::atomic_uint32_t& numFinished,
size_t batchSize) size_t batchSize)
@@ -1273,8 +1180,8 @@ public:
ReadCallbackData(ReadCallbackData const& other) = default; ReadCallbackData(ReadCallbackData const& other) = default;
}; };
std::vector<BlobPair> std::vector<BackendInterface::TransactionAndMetadata>
fetchBatch(std::vector<ripple::uint256> const& hashes) const fetchTransactions(std::vector<ripple::uint256> const& hashes) const override
{ {
std::size_t const numHashes = hashes.size(); std::size_t const numHashes = hashes.size();
BOOST_LOG_TRIVIAL(trace) BOOST_LOG_TRIVIAL(trace)
@@ -1282,7 +1189,8 @@ public:
std::atomic_uint32_t numFinished = 0; std::atomic_uint32_t numFinished = 0;
std::condition_variable cv; std::condition_variable cv;
std::mutex mtx; std::mutex mtx;
std::vector<BlobPair> results{numHashes}; std::vector<BackendInterface::TransactionAndMetadata> results{
numHashes};
std::vector<std::shared_ptr<ReadCallbackData>> cbs; std::vector<std::shared_ptr<ReadCallbackData>> cbs;
cbs.reserve(numHashes); cbs.reserve(numHashes);
for (std::size_t i = 0; i < hashes.size(); ++i) for (std::size_t i = 0; i < hashes.size(); ++i)
@@ -1338,7 +1246,7 @@ public:
CassandraFlatMapBackend const& backend; CassandraFlatMapBackend const& backend;
ripple::uint256 const& key; ripple::uint256 const& key;
uint32_t sequence; uint32_t sequence;
Blob& result; BackendInterface::Blob& result;
std::condition_variable& cv; std::condition_variable& cv;
std::atomic_uint32_t& numFinished; std::atomic_uint32_t& numFinished;
@@ -1348,7 +1256,7 @@ public:
CassandraFlatMapBackend const& backend, CassandraFlatMapBackend const& backend,
ripple::uint256 const& key, ripple::uint256 const& key,
uint32_t sequence, uint32_t sequence,
Blob& result, BackendInterface::Blob& result,
std::condition_variable& cv, std::condition_variable& cv,
std::atomic_uint32_t& numFinished, std::atomic_uint32_t& numFinished,
size_t batchSize) size_t batchSize)
@@ -1364,10 +1272,10 @@ public:
ReadObjectCallbackData(ReadObjectCallbackData const& other) = default; ReadObjectCallbackData(ReadObjectCallbackData const& other) = default;
}; };
std::vector<Blob> std::vector<BackendInterface::Blob>
fetchObjectsBatch( fetchLedgerObjects(
std::vector<ripple::uint256> const& keys, std::vector<ripple::uint256> const& keys,
uint32_t sequence) const uint32_t sequence) const override
{ {
std::size_t const numKeys = keys.size(); std::size_t const numKeys = keys.size();
BOOST_LOG_TRIVIAL(trace) BOOST_LOG_TRIVIAL(trace)
@@ -1375,7 +1283,7 @@ public:
std::atomic_uint32_t numFinished = 0; std::atomic_uint32_t numFinished = 0;
std::condition_variable cv; std::condition_variable cv;
std::mutex mtx; std::mutex mtx;
std::vector<Blob> results{numKeys}; std::vector<BackendInterface::Blob> results{numKeys};
std::vector<std::shared_ptr<ReadObjectCallbackData>> cbs; std::vector<std::shared_ptr<ReadObjectCallbackData>> cbs;
cbs.reserve(numKeys); cbs.reserve(numKeys);
for (std::size_t i = 0; i < keys.size(); ++i) for (std::size_t i = 0; i < keys.size(); ++i)
@@ -1798,13 +1706,13 @@ public:
cass_future_free(fut); cass_future_free(fut);
} }
void void
store( writeLedgerObject(
std::string&& key, std::string&& key,
uint32_t seq, uint32_t seq,
std::string&& blob, std::string&& blob,
bool isCreated, bool isCreated,
bool isDeleted, bool isDeleted,
std::optional<ripple::uint256>&& book) const std::optional<ripple::uint256>&& book) const override
{ {
BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra"; BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra";
WriteCallbackData* data = new WriteCallbackData( WriteCallbackData* data = new WriteCallbackData(
@@ -1830,7 +1738,7 @@ public:
} }
void void
storeAccountTx(AccountTransactionsData&& data) const writeAccountTransactions(AccountTransactionsData&& data) const override
{ {
numRequestsOutstanding_ += data.accounts.size(); numRequestsOutstanding_ += data.accounts.size();
WriteAccountTxCallbackData* cbData = WriteAccountTxCallbackData* cbData =
@@ -2023,11 +1931,11 @@ public:
cass_future_free(fut); cass_future_free(fut);
} }
void void
storeTransaction( writeTransaction(
std::string&& hash, std::string&& hash,
uint32_t seq, uint32_t seq,
std::string&& transaction, std::string&& transaction,
std::string&& metadata) std::string&& metadata) const override
{ {
BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra"; BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra";
WriteTransactionCallbackData* data = new WriteTransactionCallbackData( WriteTransactionCallbackData* data = new WriteTransactionCallbackData(
@@ -2042,7 +1950,7 @@ public:
} }
void void
sync() const sync() const override
{ {
std::unique_lock<std::mutex> lck(syncMutex_); std::unique_lock<std::mutex> lck(syncMutex_);

View File

@@ -76,7 +76,7 @@ ReportingETL::insertTransactions(
auto journal = ripple::debugLog(); auto journal = ripple::debugLog();
accountTxData.emplace_back(txMeta, std::move(nodestoreHash), journal); accountTxData.emplace_back(txMeta, std::move(nodestoreHash), journal);
std::string keyStr{(const char*)sttx.getTransactionID().data(), 32}; std::string keyStr{(const char*)sttx.getTransactionID().data(), 32};
flatMapBackend_.storeTransaction( flatMapBackend_.writeTransaction(
std::move(keyStr), std::move(keyStr),
ledger.seq, ledger.seq,
std::move(*raw), std::move(*raw),
@@ -89,7 +89,7 @@ std::optional<ripple::LedgerInfo>
ReportingETL::loadInitialLedger(uint32_t startingSequence) ReportingETL::loadInitialLedger(uint32_t startingSequence)
{ {
// check that database is actually empty // check that database is actually empty
auto ledger = flatMapBackend_.getLedgerBySequence(startingSequence); auto ledger = flatMapBackend_.fetchLedgerBySequence(startingSequence);
if (ledger) if (ledger)
{ {
BOOST_LOG_TRIVIAL(fatal) << __func__ << " : " BOOST_LOG_TRIVIAL(fatal) << __func__ << " : "
@@ -128,7 +128,7 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
{ {
for (auto& data : accountTxData) for (auto& data : accountTxData)
{ {
flatMapBackend_.storeAccountTx(std::move(data)); flatMapBackend_.writeAccountTransactions(std::move(data));
} }
bool success = flatMapBackend_.writeLedger( bool success = flatMapBackend_.writeLedger(
lgrInfo, std::move(*ledgerData->mutable_ledger_header())); lgrInfo, std::move(*ledgerData->mutable_ledger_header()));
@@ -155,7 +155,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
size_t numAttempts = 0; size_t numAttempts = 0;
while (!stopping_) while (!stopping_)
{ {
auto ledger = flatMapBackend_.getLedgerBySequence(ledgerSequence); auto ledger = flatMapBackend_.fetchLedgerBySequence(ledgerSequence);
if (!ledger) if (!ledger)
{ {
@@ -292,7 +292,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
} }
assert(not(isCreated and isDeleted)); assert(not(isCreated and isDeleted));
flatMapBackend_.store( flatMapBackend_.writeLedgerObject(
std::move(*obj.mutable_key()), std::move(*obj.mutable_key()),
lgrInfo.seq, lgrInfo.seq,
std::move(*obj.mutable_data()), std::move(*obj.mutable_data()),
@@ -302,7 +302,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
} }
for (auto& data : accountTxData) for (auto& data : accountTxData)
{ {
flatMapBackend_.storeAccountTx(std::move(data)); flatMapBackend_.writeAccountTransactions(std::move(data));
} }
bool success = flatMapBackend_.writeLedger( bool success = flatMapBackend_.writeLedger(
lgrInfo, std::move(*rawData.mutable_ledger_header())); lgrInfo, std::move(*rawData.mutable_ledger_header()));
@@ -347,7 +347,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
<< "Starting etl pipeline"; << "Starting etl pipeline";
writing_ = true; writing_ = true;
auto parent = flatMapBackend_.getLedgerBySequence(startSequence - 1); auto parent = flatMapBackend_.fetchLedgerBySequence(startSequence - 1);
if (!parent) if (!parent)
{ {
assert(false); assert(false);
@@ -428,7 +428,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
if (isStopping()) if (isStopping())
continue; continue;
auto numTxns = fetchResponse->transactions_list().transactions_size(); auto numTxns =
fetchResponse->transactions_list().transactions_size();
auto numObjects = fetchResponse->ledger_objects().objects_size(); auto numObjects = fetchResponse->ledger_objects().objects_size();
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
auto [lgrInfo, success] = buildNextLedger(*fetchResponse); auto [lgrInfo, success] = buildNextLedger(*fetchResponse);
@@ -481,7 +482,7 @@ void
ReportingETL::monitor() ReportingETL::monitor()
{ {
std::optional<uint32_t> latestSequence = std::optional<uint32_t> latestSequence =
flatMapBackend_.getLatestLedgerSequence(); flatMapBackend_.fetchLatestLedgerSequence();
if (!latestSequence) if (!latestSequence)
{ {
BOOST_LOG_TRIVIAL(info) << __func__ << " : " BOOST_LOG_TRIVIAL(info) << __func__ << " : "