refactor of CassandraBackend. compiles, not tested

This commit is contained in:
CJ Cobb
2021-03-03 16:26:34 -05:00
parent 308c585801
commit 997e3ac2e7
2 changed files with 616 additions and 1233 deletions

View File

@@ -1,5 +1,37 @@
#include <reporting/CassandraBackend.h>
namespace Backend {
template <class T, class F>
void
processAsyncWriteResponse(T&& requestParams, CassFuture* fut, F func)
{
CassandraBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.getIOContext(),
std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &func](
const boost::system::error_code& error) {
func(requestParams, true);
});
}
else
{
backend.finishAsyncWrite();
int remaining = --requestParams.refs;
if (remaining == 0)
delete &requestParams;
}
}
// Process the result of an asynchronous write. Retry on error
// @param fut cassandra future associated with the write
// @param cbData struct that holds the request parameters
@@ -8,72 +40,31 @@ flatMapWriteCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteCallbackData& requestParams =
*static_cast<CassandraBackend::WriteCallbackData*>(cbData);
CassandraBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
backend.write(requestParams, true);
});
}
else
{
--(backend.numRequestsOutstanding_);
auto func = [&requestParams](auto& params, bool retry) {
requestParams.backend->write(params, retry);
};
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
int remaining = --requestParams.refs;
if (remaining == 0)
delete &requestParams;
}
processAsyncWriteResponse(std::move(requestParams), fut, func);
}
void
flatMapWriteBookCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteCallbackData& requestParams =
*static_cast<CassandraBackend::WriteCallbackData*>(cbData);
CassandraBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
backend.writeBook(requestParams, true);
});
}
else
{
--(backend.numRequestsOutstanding_);
auto func = [&requestParams](auto& params, bool retry) {
requestParams.backend->writeBook(params, retry);
};
processAsyncWriteResponse(std::move(requestParams), fut, func);
}
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
int remaining = --requestParams.refs;
if (remaining == 0)
delete &requestParams;
}
void
retryWriteKey(CassandraBackend::WriteCallbackData& requestParams, bool isRetry)
{
auto const& backend = *requestParams.backend;
if (requestParams.isDeleted)
backend.writeDeletedKey(requestParams, true);
else
backend.writeKey(requestParams, true);
}
void
@@ -81,39 +72,7 @@ flatMapWriteKeyCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteCallbackData& requestParams =
*static_cast<CassandraBackend::WriteCallbackData*>(cbData);
CassandraBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
if (requestParams.isDeleted)
backend.writeDeletedKey(requestParams, true);
else
backend.writeKey(requestParams, true);
});
}
else
{
--(backend.numRequestsOutstanding_);
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
int remaining = --requestParams.refs;
if (remaining == 0)
delete &requestParams;
}
processAsyncWriteResponse(std::move(requestParams), fut, retryWriteKey);
}
void
flatMapGetCreatedCallback(CassFuture* fut, void* cbData)
@@ -148,30 +107,16 @@ flatMapGetCreatedCallback(CassFuture* fut, void* cbData)
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
};
CassResult const* res = cass_future_get_result(fut);
CassandraResult result{cass_future_get_result(fut)};
CassRow const* row = cass_result_first_row(res);
if (!row)
if (!result)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc
<< ", " << cass_error_desc(rc);
finish();
return;
}
cass_int64_t created;
rc = cass_value_get_int64(cass_row_get_column(row, 0), &created);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error)
<< "Cassandra fetch get bytes error : " << rc << ", "
<< cass_error_desc(rc);
finish();
return;
}
cass_result_free(res);
requestParams.createdSequence = created;
requestParams.createdSequence = result.getUInt32();
backend.writeDeletedKey(requestParams, false);
}
}
@@ -180,138 +125,41 @@ flatMapWriteTransactionCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteTransactionCallbackData& requestParams =
*static_cast<CassandraBackend::WriteTransactionCallbackData*>(cbData);
CassandraBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
backend.writeTransaction(requestParams, true);
});
}
else
{
--(backend.numRequestsOutstanding_);
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
delete &requestParams;
}
auto func = [&requestParams](auto& params, bool retry) {
requestParams.backend->writeTransaction(params, retry);
};
processAsyncWriteResponse(std::move(requestParams), fut, func);
}
void
flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteAccountTxCallbackData& requestParams =
*static_cast<CassandraBackend::WriteAccountTxCallbackData*>(cbData);
CassandraBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
backend.writeAccountTx(requestParams, true);
});
}
else
{
--(backend.numRequestsOutstanding_);
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
int remaining = --requestParams.refs;
if (remaining == 0)
delete &requestParams;
}
auto func = [&requestParams](auto& params, bool retry) {
requestParams.backend->writeAccountTx(params, retry);
};
processAsyncWriteResponse(std::move(requestParams), fut, func);
}
void
flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteLedgerHeaderCallbackData& requestParams =
*static_cast<CassandraBackend::WriteLedgerHeaderCallbackData*>(cbData);
CassandraBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
backend.writeLedgerHeader(requestParams, true);
});
}
else
{
--(backend.numRequestsOutstanding_);
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
delete &requestParams;
}
auto func = [&requestParams](auto& params, bool retry) {
requestParams.backend->writeLedgerHeader(params, retry);
};
processAsyncWriteResponse(std::move(requestParams), fut, func);
}
void
flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteLedgerHashCallbackData& requestParams =
*static_cast<CassandraBackend::WriteLedgerHashCallbackData*>(cbData);
CassandraBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
backend.writeLedgerHash(requestParams, true);
});
}
else
{
--(backend.numRequestsOutstanding_);
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
delete &requestParams;
}
auto func = [&requestParams](auto& params, bool retry) {
requestParams.backend->writeLedgerHash(params, retry);
};
processAsyncWriteResponse(std::move(requestParams), fut, func);
}
// Process the result of an asynchronous read. Retry on error
@@ -322,69 +170,15 @@ flatMapReadCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::ReadCallbackData& requestParams =
*static_cast<CassandraBackend::ReadCallbackData*>(cbData);
auto func = [&requestParams](auto& params) {
requestParams.backend.read(params);
};
CassandraAsyncResult asyncResult{requestParams, fut, func};
CassandraResult& result = asyncResult.getResult();
CassError rc = cass_future_error_code(fut);
if (rc != CASS_OK)
if (!!result)
{
BOOST_LOG_TRIVIAL(warning) << "Cassandra fetch error : " << rc << " : "
<< cass_error_desc(rc) << " - retrying";
// Retry right away. The only time the cluster should ever be overloaded
// is when the very first ledger is being written in full (millions of
// writes at once), during which no reads should be occurring. If reads
// are timing out, the code/architecture should be modified to handle
// greater read load, as opposed to just exponential backoff
requestParams.backend.read(requestParams);
}
else
{
auto finish = [&requestParams]() {
size_t batchSize = requestParams.batchSize;
if (++(requestParams.numFinished) == batchSize)
requestParams.cv.notify_all();
};
CassResult const* res = cass_future_get_result(fut);
CassRow const* row = cass_result_first_row(res);
if (!row)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc
<< ", " << cass_error_desc(rc);
finish();
return;
}
cass_byte_t const* buf;
std::size_t bufSize;
rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error)
<< "Cassandra fetch get bytes error : " << rc << ", "
<< cass_error_desc(rc);
finish();
return;
}
std::vector<unsigned char> txn{buf, buf + bufSize};
cass_byte_t const* buf2;
std::size_t buf2Size;
rc =
cass_value_get_bytes(cass_row_get_column(row, 1), &buf2, &buf2Size);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error)
<< "Cassandra fetch get bytes error : " << rc << ", "
<< cass_error_desc(rc);
finish();
return;
}
std::vector<unsigned char> meta{buf2, buf2 + buf2Size};
requestParams.result = {std::move(txn), std::move(meta)};
cass_result_free(res);
finish();
requestParams.result = {result.getBytes(), result.getBytes()};
}
}
@@ -396,59 +190,15 @@ flatMapReadObjectCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::ReadObjectCallbackData& requestParams =
*static_cast<CassandraBackend::ReadObjectCallbackData*>(cbData);
auto func = [&requestParams](auto& params) {
requestParams.backend.readObject(params);
};
CassandraAsyncResult asyncResult{requestParams, fut, func};
CassandraResult& result = asyncResult.getResult();
CassError rc = cass_future_error_code(fut);
if (rc != CASS_OK)
if (!!result)
{
BOOST_LOG_TRIVIAL(warning) << "Cassandra fetch error : " << rc << " : "
<< cass_error_desc(rc) << " - retrying";
// Retry right away. The only time the cluster should ever be overloaded
// is when the very first ledger is being written in full (millions of
// writes at once), during which no reads should be occurring. If reads
// are timing out, the code/architecture should be modified to handle
// greater read load, as opposed to just exponential backoff
requestParams.backend.readObject(requestParams);
}
else
{
auto finish = [&requestParams]() {
BOOST_LOG_TRIVIAL(trace)
<< "flatMapReadObjectCallback - finished a read";
size_t batchSize = requestParams.batchSize;
if (++(requestParams.numFinished) == batchSize)
requestParams.cv.notify_all();
};
CassResult const* res = cass_future_get_result(fut);
CassRow const* row = cass_result_first_row(res);
if (!row)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error)
<< "Cassandra fetch get row error : " << rc << ", "
<< cass_error_desc(rc)
<< " key = " << ripple::strHex(requestParams.key);
finish();
return;
}
cass_byte_t const* buf;
std::size_t bufSize;
rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error)
<< "Cassandra fetch get bytes error : " << rc << ", "
<< cass_error_desc(rc);
finish();
return;
}
std::vector<unsigned char> obj{buf, buf + bufSize};
requestParams.result = std::move(obj);
cass_result_free(res);
finish();
requestParams.result = result.getBytes();
}
}
@@ -456,68 +206,24 @@ std::optional<LedgerRange>
CassandraBackend::fetchLedgerRange() const
{
BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
auto start = std::chrono::system_clock::now();
CassStatement* statement = cass_prepared_bind(selectLedgerRange_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
CassFuture* fut;
CassError rc;
do
CassandraStatement statement{selectLedgerRange_};
CassandraResult result = executeSyncRead(statement);
if (!result)
{
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Cassandra fetch error";
ss << ", retrying";
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
}
} while (rc != CASS_OK);
CassResult const* res = cass_future_get_result(fut);
cass_statement_free(statement);
cass_future_free(fut);
CassIterator* iter = cass_iterator_from_result(res);
std::optional<uint32_t> min;
std::optional<uint32_t> max;
if (cass_iterator_next(iter))
{
cass_int64_t sequence;
rc = cass_value_get_int64(
cass_row_get_column(cass_iterator_get_row(iter), 0), &sequence);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
LedgerRange range;
range.minSequence = sequence;
if (!cass_iterator_next(iter))
{
cass_result_free(res);
return range;
}
rc = cass_value_get_int64(
cass_row_get_column(cass_iterator_get_row(iter), 0), &sequence);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
cass_result_free(res);
range.maxSequence = sequence;
if (range.minSequence > range.maxSequence)
{
std::swap(range.minSequence, range.maxSequence);
}
return range;
BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows";
return {};
}
return {};
LedgerRange range;
range.maxSequence = range.minSequence = result.getUInt32();
if (result.nextRow())
{
range.maxSequence = result.getUInt32();
}
if (range.minSequence > range.maxSequence)
{
std::swap(range.minSequence, range.maxSequence);
}
return range;
}
void
@@ -1098,13 +804,13 @@ CassandraBackend::open()
query << "INSERT INTO " << tableName << "flattransactions"
<< " (hash, sequence, transaction, metadata) VALUES (?, ?, "
"?, ?)";
if (!insertTransaction_.prepare(query, session_.get()))
if (!insertTransaction_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "INSERT INTO " << tableName << "keys"
<< " (key, created, deleted) VALUES (?, ?, ?)";
if (!insertKey_.prepare(query, session_.get()))
if (!insertKey_.prepareStatement(query, session_.get()))
continue;
query = {};
@@ -1150,7 +856,8 @@ CassandraBackend::open()
/*
query = {};
query << "SELECT filterempty(key,object) FROM " << tableName << "flat "
query << "SELECT filterempty(key,object) FROM " << tableName <<
"flat "
<< " WHERE TOKEN(key) >= ? and sequence <= ?"
<< " PER PARTITION LIMIT 1 LIMIT ?"
<< " ALLOW FILTERING";
@@ -1172,8 +879,6 @@ CassandraBackend::open()
if (!getBook_.prepareStatement(query, session_.get()))
continue;
getBook_ = cass_future_get_prepared(prepare_future);
query = {};
query << " INSERT INTO " << tableName << "account_tx"
<< " (account, seq_idx, hash) "
@@ -1224,8 +929,6 @@ CassandraBackend::open()
if (!selectLedgerRange_.prepareStatement(query, session_.get()))
continue;
selectLedgerRange_ = cass_future_get_prepared(prepare_future);
setupPreparedStatements = true;
}

File diff suppressed because it is too large Load Diff