write transaction hashes to separate table

This commit is contained in:
CJ Cobb
2021-06-29 17:33:42 +00:00
parent 416a3cc523
commit 9f8724b7ab
4 changed files with 94 additions and 36 deletions

View File

@@ -51,6 +51,13 @@ processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func)
delete &requestParams;
}
}
template <class T>
void
processAsyncWrite(CassFuture* fut, void* cbData)
{
T& requestParams = *static_cast<T*>(cbData);
processAsyncWriteResponse(requestParams, fut, requestParams.retry);
}
// Process the result of an asynchronous write. Retry on error
// @param fut cassandra future associated with the write
// @param cbData struct that holds the request parameters
@@ -217,6 +224,54 @@ flatMapReadObjectCallback(CassFuture* fut, void* cbData)
}
}
template <class T, class F>
struct CallbackData
{
CassandraBackend const* backend;
T data;
F retry;
uint32_t currentRetries;
std::atomic<int> refs = 1;
CallbackData(CassandraBackend const* b, T&& d, F f)
: backend(b), data(std::move(d)), retry(f)
{
}
};
void
CassandraBackend::writeTransaction(
std::string&& hash,
uint32_t seq,
std::string&& transaction,
std::string&& metadata) const
{
BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra";
std::string hashCpy = hash;
auto func = [this](auto& params, bool retry) {
CassandraStatement statement{insertLedgerTransaction_};
statement.bindInt(params.data.first);
statement.bindBytes(params.data.second);
executeAsyncWrite(
statement,
processAsyncWrite<
typename std::remove_reference<decltype(params)>::type>,
params,
retry);
};
auto* lgrSeqToHash =
new CallbackData(this, std::make_pair(seq, std::move(hashCpy)), func);
WriteTransactionCallbackData* data = new WriteTransactionCallbackData(
this,
std::move(hash),
seq,
std::move(transaction),
std::move(metadata));
writeTransaction(*data, false);
func(*lgrSeqToHash, false);
}
std::optional<LedgerRange>
CassandraBackend::fetchLedgerRange() const
{
@@ -243,21 +298,8 @@ CassandraBackend::fetchLedgerRange() const
std::vector<TransactionAndMetadata>
CassandraBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const
{
CassandraStatement statement{selectAllTransactionsInLedger_};
statement.bindInt(ledgerSequence);
CassandraResult result = executeSyncRead(statement);
if (!result)
{
BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows";
return {};
}
std::vector<TransactionAndMetadata> txns;
do
{
txns.push_back(
{result.getBytes(), result.getBytes(), result.getUInt32()});
} while (result.nextRow());
return txns;
auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence);
return fetchTransactions(hashes);
}
std::vector<ripple::uint256>
CassandraBackend::fetchAllTransactionHashesInLedger(
@@ -1373,6 +1415,14 @@ CassandraBackend::open(bool readOnly)
<< " WITH default_time_to_live = " << std::to_string(ttl);
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix
<< "ledger_transactions"
<< " ( ledger_sequence bigint, hash blob, PRIMARY "
"KEY(ledger_sequence, hash))"
<< " WITH default_time_to_live = " << std::to_string(ttl);
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "SELECT * FROM " << tablePrefix << "transactions"
@@ -1483,6 +1533,12 @@ CassandraBackend::open(bool readOnly)
"?, ?)";
if (!insertTransaction_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << "INSERT INTO " << tablePrefix << "ledger_transactions"
<< " (ledger_sequence, hash) VALUES "
"(?, ?)";
if (!insertLedgerTransaction_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << "INSERT INTO " << tablePrefix << "keys"
@@ -1519,7 +1575,7 @@ CassandraBackend::open(bool readOnly)
query, session_.get()))
continue;
query.str("");
query << "SELECT hash FROM " << tablePrefix << "transactions"
query << "SELECT hash FROM " << tablePrefix << "ledger_transactions"
<< " WHERE ledger_sequence = ?";
if (!selectAllTransactionHashesInLedger_.prepareStatement(
query, session_.get()))

View File

@@ -26,13 +26,13 @@
#include <boost/json.hpp>
#include <boost/log/trivial.hpp>
#include <atomic>
#include <backend/BackendInterface.h>
#include <backend/DBHelpers.h>
#include <cassandra.h>
#include <cstddef>
#include <iostream>
#include <memory>
#include <mutex>
#include <backend/BackendInterface.h>
#include <backend/DBHelpers.h>
namespace Backend {
@@ -633,6 +633,7 @@ private:
// than making a new statement
CassandraPreparedStatement insertObject_;
CassandraPreparedStatement insertTransaction_;
CassandraPreparedStatement insertLedgerTransaction_;
CassandraPreparedStatement selectTransaction_;
CassandraPreparedStatement selectAllTransactionsInLedger_;
CassandraPreparedStatement selectAllTransactionHashesInLedger_;
@@ -1319,18 +1320,7 @@ public:
std::string&& hash,
uint32_t seq,
std::string&& transaction,
std::string&& metadata) const override
{
BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra";
WriteTransactionCallbackData* data = new WriteTransactionCallbackData(
this,
std::move(hash),
seq,
std::move(transaction),
std::move(metadata));
writeTransaction(*data, false);
}
std::string&& metadata) const override;
void
startWrites() const override

View File

@@ -529,7 +529,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
uint32_t currentSequence = startSequence;
int counter = 0;
std::atomic_int per = 4;
std::atomic_int per = 100;
auto startTimer = [this, &per]() {
auto innerFunc = [this, &per](auto& f) -> void {
std::shared_ptr<boost::asio::steady_timer> timer =
@@ -549,7 +549,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
};
innerFunc(innerFunc);
};
startTimer();
// startTimer();
auto begin = std::chrono::system_clock::now();
while (!writeConflict)
@@ -608,6 +609,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
deleting_ = false;
});
}
/*
if (++counter >= per)
{
std::chrono::milliseconds sleep =
@@ -619,6 +621,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
counter = 0;
begin = std::chrono::system_clock::now();
}
*/
}
}};