refactor of cassandra backend, needs testing

This commit is contained in:
CJ Cobb
2021-07-12 20:57:03 +00:00
parent 607ea0a76e
commit 599bd1b655
2 changed files with 75 additions and 307 deletions

View File

@@ -2,19 +2,6 @@
#include <backend/DBHelpers.h>
#include <functional>
#include <unordered_map>
/*
namespace std {
template <>
struct hash<ripple::uint256>
{
std::size_t
operator()(const ripple::uint256& k) const noexcept
{
return boost::hash_range(k.begin(), k.end());
}
};
} // namespace std
*/
namespace Backend {
template <class T, class F>
void
@@ -53,104 +40,20 @@ void
processAsyncWrite(CassFuture* fut, void* cbData)
{
T& requestParams = *static_cast<T*>(cbData);
// TODO don't pass in func
processAsyncWriteResponse(requestParams, fut, requestParams.retry);
}
/*
template <class T>
void
processAsyncRead(CassFuture* fut, void* cbData)
{
T& requestParams = *static_cast<T*>(cbData);
CassError rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
requestParams.result = {};
}
else
{
CassandraResult result =
std::move(CassandraResult(cass_future_get_result(fut)));
requestParams.populate(result);
std::lock_guard lck(requestParams.mtx);
size_t batchSize = requestParams.batchSize;
if (++(requestParams_.numFinished) == batchSize)
requestParams_.cv.notify_all();
}
}
*/
// Process the result of an asynchronous read. Retry on error
// @param fut cassandra future associated with the read
// @param cbData struct that holds the request parameters
void
flatMapReadCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::ReadCallbackData& requestParams =
*static_cast<CassandraBackend::ReadCallbackData*>(cbData);
auto func = [](auto& params) { params.backend.read(params); };
CassandraAsyncResult asyncResult{requestParams, fut, func};
if (asyncResult.timedOut())
requestParams.result.transaction = {0};
CassandraResult& result = asyncResult.getResult();
if (!!result)
{
requestParams.result = {
result.getBytes(), result.getBytes(), result.getUInt32()};
}
}
// Process the result of an asynchronous read. Retry on error
// @param fut cassandra future associated with the read
// @param cbData struct that holds the request parameters
void
flatMapReadObjectCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::ReadObjectCallbackData& requestParams =
*static_cast<CassandraBackend::ReadObjectCallbackData*>(cbData);
auto func = [](auto& params) { params.backend.readObject(params); };
CassandraAsyncResult asyncResult{requestParams, fut, func};
if (asyncResult.timedOut())
requestParams.result = {0};
CassandraResult& result = asyncResult.getResult();
if (!!result)
{
requestParams.result = result.getBytes();
}
}
/*
template <class T, class Result>
struct ReadCallbackData
{
using Finisher = std::function<Result(CassandraResult)>;
T data;
CassandraBackend const* backend;
Finisher finish;
ReadCallbackData(CassandraBackend const* backend, T&& d, Finisher f)
: backend(b), data(d), finish(f)
{
}
void
finish(CassandraResult& res)
{
finish(res)
}
};
*/
template <class T, class B>
struct CallbackData
struct WriteCallbackData
{
CassandraBackend const* backend;
T data;
std::function<void(CallbackData<T, B>&, bool)> retry;
std::function<void(WriteCallbackData<T, B>&, bool)> retry;
uint32_t currentRetries;
std::atomic<int> refs = 1;
CallbackData(CassandraBackend const* b, T&& d, B bind)
WriteCallbackData(CassandraBackend const* b, T&& d, B bind)
: backend(b), data(std::move(d))
{
retry = [bind, this](auto& params, bool isRetry) {
@@ -177,12 +80,12 @@ struct CallbackData
if (remaining == 0)
delete this;
}
virtual ~CallbackData()
virtual ~WriteCallbackData()
{
}
};
template <class T, class B>
struct BulkWriteCallbackData : public CallbackData<T, B>
struct BulkWriteCallbackData : public WriteCallbackData<T, B>
{
std::mutex& mtx;
std::condition_variable& cv;
@@ -194,7 +97,7 @@ struct BulkWriteCallbackData : public CallbackData<T, B>
std::atomic_int& r,
std::mutex& m,
std::condition_variable& c)
: CallbackData<T, B>(b, std::move(d), bind)
: WriteCallbackData<T, B>(b, std::move(d), bind)
, numRemaining(r)
, mtx(m)
, cv(c)
@@ -211,8 +114,8 @@ struct BulkWriteCallbackData : public CallbackData<T, B>
{
// TODO: it would be nice to avoid this lock.
std::lock_guard lck(mtx);
--numRemaining;
cv.notify_one();
if (--numRemaining == 0)
cv.notify_one();
}
~BulkWriteCallbackData()
{
@@ -223,7 +126,7 @@ template <class T, class B>
void
makeAndExecuteAsyncWrite(CassandraBackend const* b, T&& d, B bind)
{
auto* cb = new CallbackData(b, std::move(d), bind);
auto* cb = new WriteCallbackData(b, std::move(d), bind);
cb->start();
}
template <class T, class B>
@@ -380,12 +283,53 @@ CassandraBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const
auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence);
return fetchTransactions(hashes);
}
struct ReadCallbackData
{
std::function<void(CassandraResult&)> onSuccess;
std::atomic_int& numOutstanding;
std::mutex& mtx;
std::condition_variable& cv;
bool errored = false;
ReadCallbackData(
std::atomic_int& numOutstanding,
std::mutex& m,
std::condition_variable& cv,
std::function<void(CassandraResult&)> onSuccess)
: numOutstanding(numOutstanding), mtx(m), cv(cv), onSuccess(onSuccess)
{
}
void
finish(CassFuture* fut)
{
CassError rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
errored = true;
}
else
{
CassandraResult result{cass_future_get_result(fut)};
onSuccess(result);
}
std::lock_guard lck(mtx);
if (--numOutstanding == 0)
cv.notify_one();
}
};
void
processAsyncRead(CassFuture* fut, void* cbData)
{
ReadCallbackData cb = *static_cast<ReadCallbackData*>(cbData);
cb.finish(fut);
}
std::vector<TransactionAndMetadata>
CassandraBackend::fetchTransactions(
std::vector<ripple::uint256> const& hashes) const
{
std::size_t const numHashes = hashes.size();
std::atomic_uint32_t numFinished = 0;
std::atomic_int numOutstanding = numHashes;
std::condition_variable cv;
std::mutex mtx;
std::vector<TransactionAndMetadata> results{numHashes};
@@ -394,19 +338,23 @@ CassandraBackend::fetchTransactions(
auto start = std::chrono::system_clock::now();
for (std::size_t i = 0; i < hashes.size(); ++i)
{
CassandraStatement statement{selectTransaction_};
statement.bindBytes(hashes[i]);
cbs.push_back(std::make_shared<ReadCallbackData>(
*this, hashes[i], results[i], cv, numFinished, numHashes));
read(*cbs[i]);
numOutstanding, mtx, cv, [i, &results](auto& result) {
results[i] = {
result.getBytes(), result.getBytes(), result.getUInt32()};
}));
executeAsyncRead(statement, processAsyncRead, *cbs[i]);
}
assert(results.size() == cbs.size());
std::unique_lock<std::mutex> lck(mtx);
cv.wait(
lck, [&numFinished, &numHashes]() { return numFinished == numHashes; });
cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; });
auto end = std::chrono::system_clock::now();
for (auto const& res : results)
for (auto const& cb : cbs)
{
if (res.transaction.size() == 1 && res.transaction[0] == 0)
if (cb->errored)
throw DatabaseTimeout();
}
@@ -522,25 +470,30 @@ CassandraBackend::fetchLedgerObjects(
std::size_t const numKeys = keys.size();
BOOST_LOG_TRIVIAL(trace)
<< "Fetching " << numKeys << " records from Cassandra";
std::atomic_uint32_t numFinished = 0;
std::atomic_int numOutstanding = numKeys;
std::condition_variable cv;
std::mutex mtx;
std::vector<Blob> results{numKeys};
std::vector<std::shared_ptr<ReadObjectCallbackData>> cbs;
std::vector<std::shared_ptr<ReadCallbackData>> cbs;
cbs.reserve(numKeys);
for (std::size_t i = 0; i < keys.size(); ++i)
{
cbs.push_back(std::make_shared<ReadObjectCallbackData>(
*this, keys[i], sequence, results[i], cv, numFinished, numKeys));
readObject(*cbs[i]);
cbs.push_back(std::make_shared<ReadCallbackData>(
numOutstanding, mtx, cv, [i, &results](auto& result) {
results[i] = result.getBytes();
}));
CassandraStatement statement{selectObject_};
statement.bindBytes(keys[i]);
statement.bindInt(sequence);
executeAsyncRead(statement, processAsyncRead, *cbs[i]);
}
assert(results.size() == cbs.size());
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numFinished, &numKeys]() { return numFinished == numKeys; });
for (auto const& res : results)
cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; });
for (auto const& cb : cbs)
{
if (res.size() == 1 && res[0] == 0)
if (cb->errored)
throw DatabaseTimeout();
}
@@ -1220,13 +1173,6 @@ CassandraBackend::open(bool readOnly)
<< " is_latest IN (true, false)";
if (!selectLedgerRange_.prepareStatement(query, session_.get()))
continue;
/*
query.str("");
query << " SELECT key,object FROM " << tablePrefix
<< "objects WHERE sequence = ?";
if (!selectLedgerDiff_.prepareStatement(query, session_.get()))
continue;
*/
setupPreparedStatements = true;
}

View File

@@ -36,13 +36,6 @@
namespace Backend {
void
flatMapReadCallback(CassFuture* fut, void* cbData);
void
flatMapReadObjectCallback(CassFuture* fut, void* cbData);
void
flatMapGetCreatedCallback(CassFuture* fut, void* cbData);
class CassandraPreparedStatement
{
private:
@@ -521,63 +514,6 @@ isTimeout(CassError rc)
return true;
return false;
}
template <class T, class F>
class CassandraAsyncResult
{
T& requestParams_;
CassandraResult result_;
bool timedOut_ = false;
bool retryOnTimeout_ = false;
public:
CassandraAsyncResult(
T& requestParams,
CassFuture* fut,
F retry,
bool retryOnTimeout = false)
: requestParams_(requestParams), retryOnTimeout_(retryOnTimeout)
{
CassError rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
// TODO - should we ever be retrying requests? These are reads,
// and they usually only fail when the db is under heavy load. Seems
// best to just return an error to the client and have the client
// try again
if (isTimeout(rc))
timedOut_ = true;
if (!timedOut_ || retryOnTimeout_)
retry(requestParams_);
}
else
{
result_ = std::move(CassandraResult(cass_future_get_result(fut)));
}
}
~CassandraAsyncResult()
{
if (result_.isOk() || timedOut_)
{
BOOST_LOG_TRIVIAL(trace) << "finished a request";
size_t batchSize = requestParams_.batchSize;
if (++(requestParams_.numFinished) == batchSize)
requestParams_.cv.notify_all();
}
}
CassandraResult&
getResult()
{
return result_;
}
bool
timedOut()
{
return timedOut_;
}
};
class CassandraBackend : public BackendInterface
{
@@ -629,7 +565,6 @@ private:
CassandraPreparedStatement getToken_;
CassandraPreparedStatement insertKey_;
CassandraPreparedStatement selectKeys_;
CassandraPreparedStatement getBook_;
CassandraPreparedStatement insertAccountTx_;
CassandraPreparedStatement selectAccountTx_;
CassandraPreparedStatement insertLedgerHeader_;
@@ -677,17 +612,10 @@ public:
~CassandraBackend() override
{
BOOST_LOG_TRIVIAL(info) << __func__;
if (open_)
close();
}
std::string
getName()
{
return "cassandra";
}
bool
isOpen()
{
@@ -711,11 +639,6 @@ public:
}
open_ = false;
}
CassandraPreparedStatement const&
getInsertObjectPreparedStatement() const
{
return insertObject_;
}
std::pair<
std::vector<TransactionAndMetadata>,
@@ -904,18 +827,6 @@ public:
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const override;
std::vector<LedgerObject>
fetchLedgerDiff(uint32_t ledgerSequence) const;
std::map<uint32_t, std::vector<LedgerObject>>
fetchLedgerDiffs(std::vector<uint32_t> const& sequences) const;
bool
runIndexer(uint32_t ledgerSequence) const;
bool
isIndexed(uint32_t ledgerSequence) const;
std::optional<uint32_t>
getNextToIndex() const;
bool
writeKeys(
@@ -923,94 +834,10 @@ public:
KeyIndex const& index,
bool isAsync = false) const override;
bool
canFetchBatch()
{
return true;
}
struct ReadCallbackData
{
CassandraBackend const& backend;
ripple::uint256 const& hash;
TransactionAndMetadata& result;
std::condition_variable& cv;
std::atomic_uint32_t& numFinished;
size_t batchSize;
ReadCallbackData(
CassandraBackend const& backend,
ripple::uint256 const& hash,
TransactionAndMetadata& result,
std::condition_variable& cv,
std::atomic_uint32_t& numFinished,
size_t batchSize)
: backend(backend)
, hash(hash)
, result(result)
, cv(cv)
, numFinished(numFinished)
, batchSize(batchSize)
{
}
ReadCallbackData(ReadCallbackData const& other) = default;
};
std::vector<TransactionAndMetadata>
fetchTransactions(
std::vector<ripple::uint256> const& hashes) const override;
void
read(ReadCallbackData& data) const
{
CassandraStatement statement{selectTransaction_};
statement.bindBytes(data.hash);
executeAsyncRead(statement, flatMapReadCallback, data);
}
struct ReadObjectCallbackData
{
CassandraBackend const& backend;
ripple::uint256 const& key;
uint32_t sequence;
Blob& result;
std::condition_variable& cv;
std::atomic_uint32_t& numFinished;
size_t batchSize;
ReadObjectCallbackData(
CassandraBackend const& backend,
ripple::uint256 const& key,
uint32_t sequence,
Blob& result,
std::condition_variable& cv,
std::atomic_uint32_t& numFinished,
size_t batchSize)
: backend(backend)
, key(key)
, sequence(sequence)
, result(result)
, cv(cv)
, numFinished(numFinished)
, batchSize(batchSize)
{
}
ReadObjectCallbackData(ReadObjectCallbackData const& other) = default;
};
void
readObject(ReadObjectCallbackData& data) const
{
CassandraStatement statement{selectObject_};
statement.bindBytes(data.key);
statement.bindInt(data.sequence);
executeAsyncRead(statement, flatMapReadObjectCallback, data);
}
std::vector<Blob>
fetchLedgerObjects(
std::vector<ripple::uint256> const& keys,
@@ -1057,11 +884,6 @@ public:
return ioContext_;
}
friend void
flatMapReadCallback(CassFuture* fut, void* cbData);
friend void
flatMapReadObjectCallback(CassFuture* fut, void* cbData);
inline void
incremementOutstandingRequestCount() const
{