Files
clio/src/backend/CassandraBackend.cpp
2021-07-09 19:54:04 +00:00

1896 lines
63 KiB
C++

#include <backend/CassandraBackend.h>
#include <backend/DBHelpers.h>
#include <functional>
#include <unordered_map>
/*
namespace std {
template <>
struct hash<ripple::uint256>
{
std::size_t
operator()(const ripple::uint256& k) const noexcept
{
return boost::hash_range(k.begin(), k.end());
}
};
} // namespace std
*/
namespace Backend {
template <class T, class F>
void
processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func)
{
CassandraBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra ETL insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying in " << wait.count()
<< " milliseconds";
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.getIOContext(),
std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, func](
const boost::system::error_code& error) {
func(requestParams, true);
});
}
else
{
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " Succesfully inserted a record";
requestParams.finish();
}
}
template <class T>
void
processAsyncWrite(CassFuture* fut, void* cbData)
{
T& requestParams = *static_cast<T*>(cbData);
processAsyncWriteResponse(requestParams, fut, requestParams.retry);
}
/*
// Process the result of an asynchronous write. Retry on error
// @param fut cassandra future associated with the write
// @param cbData struct that holds the request parameters
void
flatMapWriteCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteCallbackData& requestParams =
*static_cast<CassandraBackend::WriteCallbackData*>(cbData);
auto func = [](auto& params, bool retry) {
params.backend->write(params, retry);
};
processAsyncWriteResponse(requestParams, fut, func);
}
*/
/*
void
retryWriteKey(CassandraBackend::WriteCallbackData& requestParams, bool isRetry)
{
auto const& backend = *requestParams.backend;
if (requestParams.isDeleted)
backend.writeDeletedKey(requestParams, true);
else
backend.writeKey(requestParams, true);
}
void
flatMapWriteKeyCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteCallbackData& requestParams =
*static_cast<CassandraBackend::WriteCallbackData*>(cbData);
processAsyncWriteResponse(requestParams, fut, retryWriteKey);
}
void
flatMapGetCreatedCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteCallbackData& requestParams =
*static_cast<CassandraBackend::WriteCallbackData*>(cbData);
CassandraBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
BOOST_LOG_TRIVIAL(info) << __func__;
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
backend.writeKey(requestParams, true);
});
}
else
{
auto finish = [&backend]() {
--(backend.numRequestsOutstanding_);
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
};
CassandraResult result{cass_future_get_result(fut)};
if (!result)
{
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc
<< ", " << cass_error_desc(rc);
finish();
return;
}
requestParams.createdSequence = result.getUInt32();
backend.writeDeletedKey(requestParams, false);
}
}
*/
/*
void
flatMapWriteTransactionCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteTransactionCallbackData& requestParams =
*static_cast<CassandraBackend::WriteTransactionCallbackData*>(cbData);
auto func = [](auto& params, bool retry) {
params.backend->writeTransaction(params, retry);
};
processAsyncWriteResponse(requestParams, fut, func);
}
void
flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteAccountTxCallbackData& requestParams =
*static_cast<CassandraBackend::WriteAccountTxCallbackData*>(cbData);
auto func = [](auto& params, bool retry) {
params.backend->writeAccountTx(params, retry);
};
processAsyncWriteResponse(requestParams, fut, func);
}
void
flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteLedgerHeaderCallbackData& requestParams =
*static_cast<CassandraBackend::WriteLedgerHeaderCallbackData*>(cbData);
auto func = [](auto& params, bool retry) {
params.backend->writeLedgerHeader(params, retry);
};
processAsyncWriteResponse(requestParams, fut, func);
}
void
flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteLedgerHashCallbackData& requestParams =
*static_cast<CassandraBackend::WriteLedgerHashCallbackData*>(cbData);
auto func = [](auto& params, bool retry) {
params.backend->writeLedgerHash(params, retry);
};
processAsyncWriteResponse(requestParams, fut, func);
}
*/
// Process the result of an asynchronous read. Retry on error
// @param fut cassandra future associated with the read
// @param cbData struct that holds the request parameters
void
flatMapReadCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::ReadCallbackData& requestParams =
*static_cast<CassandraBackend::ReadCallbackData*>(cbData);
auto func = [](auto& params) { params.backend.read(params); };
CassandraAsyncResult asyncResult{requestParams, fut, func};
if (asyncResult.timedOut())
requestParams.result.transaction = {0};
CassandraResult& result = asyncResult.getResult();
if (!!result)
{
requestParams.result = {
result.getBytes(), result.getBytes(), result.getUInt32()};
}
}
// Process the result of an asynchronous read. Retry on error
// @param fut cassandra future associated with the read
// @param cbData struct that holds the request parameters
void
flatMapReadObjectCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::ReadObjectCallbackData& requestParams =
*static_cast<CassandraBackend::ReadObjectCallbackData*>(cbData);
auto func = [](auto& params) { params.backend.readObject(params); };
CassandraAsyncResult asyncResult{requestParams, fut, func};
if (asyncResult.timedOut())
requestParams.result = {0};
CassandraResult& result = asyncResult.getResult();
if (!!result)
{
requestParams.result = result.getBytes();
}
}
template <class T, class B>
struct CallbackData
{
CassandraBackend const* backend;
T data;
std::function<void(CallbackData<T, B>&, bool)> retry;
uint32_t currentRetries;
std::atomic<int> refs = 1;
CallbackData(CassandraBackend const* b, T&& d, B bind)
: backend(b), data(std::move(d))
{
retry = [bind, this](auto& params, bool isRetry) {
auto statement = bind(params);
backend->executeAsyncWrite(
statement,
processAsyncWrite<
typename std::remove_reference<decltype(params)>::type>,
params,
isRetry);
};
}
virtual void
start()
{
retry(*this, false);
}
virtual void
finish()
{
backend->finishAsyncWrite();
int remaining = --refs;
if (remaining == 0)
delete this;
}
virtual ~CallbackData()
{
}
};
template <class T, class B>
struct BulkWriteCallbackData : public CallbackData<T, B>
{
std::mutex& mtx;
std::condition_variable& cv;
std::atomic_int& numRemaining;
BulkWriteCallbackData(
CassandraBackend const* b,
T&& d,
B bind,
std::atomic_int& r,
std::mutex& m,
std::condition_variable& c)
: CallbackData<T, B>(b, std::move(d), bind)
, numRemaining(r)
, mtx(m)
, cv(c)
{
}
void
start() override
{
this->retry(*this, true);
}
void
finish() override
{
{
std::lock_guard lck(mtx);
--numRemaining;
cv.notify_one();
}
}
~BulkWriteCallbackData()
{
}
};
template <class T, class B>
void
makeAndExecuteAsyncWrite(CassandraBackend const* b, T&& d, B bind)
{
auto* cb = new CallbackData(b, std::move(d), bind);
cb->start();
}
template <class T, class B>
std::shared_ptr<BulkWriteCallbackData<T, B>>
makeAndExecuteBulkAsyncWrite(
CassandraBackend const* b,
T&& d,
B bind,
std::atomic_int& r,
std::mutex& m,
std::condition_variable& c)
{
auto cb = std::make_shared<BulkWriteCallbackData<T, B>>(
b, std::move(d), bind, r, m, c);
cb->start();
return cb;
}
void
CassandraBackend::doWriteLedgerObject(
std::string&& key,
uint32_t seq,
std::string&& blob,
bool isCreated,
bool isDeleted,
std::optional<ripple::uint256>&& book) const
{
BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra";
makeAndExecuteAsyncWrite(
this,
std::move(std::make_tuple(std::move(key), seq, std::move(blob))),
[this](auto& params) {
auto& [key, sequence, blob] = params.data;
CassandraStatement statement{insertObject_};
statement.bindBytes(key);
statement.bindInt(sequence);
statement.bindBytes(blob);
return statement;
});
}
void
CassandraBackend::writeLedger(
ripple::LedgerInfo const& ledgerInfo,
std::string&& header,
bool isFirst) const
{
makeAndExecuteAsyncWrite(
this,
std::move(std::make_tuple(ledgerInfo.seq, std::move(header))),
[this](auto& params) {
auto& [sequence, header] = params.data;
CassandraStatement statement{insertLedgerHeader_};
statement.bindInt(sequence);
statement.bindBytes(header);
return statement;
});
makeAndExecuteAsyncWrite(
this,
std::move(std::make_tuple(ledgerInfo.hash, ledgerInfo.seq)),
[this](auto& params) {
auto& [hash, sequence] = params.data;
CassandraStatement statement{insertLedgerHash_};
statement.bindBytes(hash);
statement.bindInt(sequence);
return statement;
});
ledgerSequence_ = ledgerInfo.seq;
isFirstLedger_ = isFirst;
}
void
CassandraBackend::writeAccountTransactions(
std::vector<AccountTransactionsData>&& data) const
{
for (auto& record : data)
{
for (auto& account : record.accounts)
{
makeAndExecuteAsyncWrite(
this,
std::move(std::make_tuple(
std::move(account),
record.ledgerSequence,
record.transactionIndex,
record.txHash)),
[this](auto& params) {
CassandraStatement statement(insertAccountTx_);
auto& [account, lgrSeq, txnIdx, hash] = params.data;
statement.bindBytes(account);
statement.bindIntTuple(lgrSeq, txnIdx);
statement.bindBytes(hash);
return statement;
});
}
}
}
void
CassandraBackend::writeTransaction(
std::string&& hash,
uint32_t seq,
std::string&& transaction,
std::string&& metadata) const
{
BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra";
std::string hashCpy = hash;
makeAndExecuteAsyncWrite(
this, std::move(std::make_pair(seq, hash)), [this](auto& params) {
CassandraStatement statement{insertLedgerTransaction_};
statement.bindInt(params.data.first);
statement.bindBytes(params.data.second);
return statement;
});
makeAndExecuteAsyncWrite(
this,
std::move(std::make_tuple(
std::move(hash), seq, std::move(transaction), std::move(metadata))),
[this](auto& params) {
CassandraStatement statement{insertTransaction_};
auto& [hash, sequence, transaction, metadata] = params.data;
statement.bindBytes(hash);
statement.bindInt(sequence);
statement.bindBytes(transaction);
statement.bindBytes(metadata);
return statement;
});
}
std::optional<LedgerRange>
CassandraBackend::fetchLedgerRange() const
{
BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
CassandraStatement statement{selectLedgerRange_};
CassandraResult result = executeSyncRead(statement);
if (!result)
{
BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows";
return {};
}
LedgerRange range;
range.maxSequence = range.minSequence = result.getUInt32();
if (result.nextRow())
{
range.maxSequence = result.getUInt32();
}
if (range.minSequence > range.maxSequence)
{
std::swap(range.minSequence, range.maxSequence);
}
return range;
}
std::vector<TransactionAndMetadata>
CassandraBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const
{
auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence);
return fetchTransactions(hashes);
}
std::vector<TransactionAndMetadata>
CassandraBackend::fetchTransactions(
std::vector<ripple::uint256> const& hashes) const
{
std::size_t const numHashes = hashes.size();
std::atomic_uint32_t numFinished = 0;
std::condition_variable cv;
std::mutex mtx;
std::vector<TransactionAndMetadata> results{numHashes};
std::vector<std::shared_ptr<ReadCallbackData>> cbs;
cbs.reserve(numHashes);
auto start = std::chrono::system_clock::now();
for (std::size_t i = 0; i < hashes.size(); ++i)
{
cbs.push_back(std::make_shared<ReadCallbackData>(
*this, hashes[i], results[i], cv, numFinished, numHashes));
read(*cbs[i]);
}
assert(results.size() == cbs.size());
std::unique_lock<std::mutex> lck(mtx);
cv.wait(
lck, [&numFinished, &numHashes]() { return numFinished == numHashes; });
auto end = std::chrono::system_clock::now();
for (auto const& res : results)
{
if (res.transaction.size() == 1 && res.transaction[0] == 0)
throw DatabaseTimeout();
}
BOOST_LOG_TRIVIAL(debug)
<< "Fetched " << numHashes << " transactions from Cassandra in "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
.count()
<< " milliseconds";
return results;
}
std::vector<ripple::uint256>
CassandraBackend::fetchAllTransactionHashesInLedger(
uint32_t ledgerSequence) const
{
CassandraStatement statement{selectAllTransactionHashesInLedger_};
statement.bindInt(ledgerSequence);
auto start = std::chrono::system_clock::now();
CassandraResult result = executeSyncRead(statement);
auto end = std::chrono::system_clock::now();
if (!result)
{
BOOST_LOG_TRIVIAL(error)
<< __func__
<< " - no rows . ledger = " << std::to_string(ledgerSequence);
return {};
}
std::vector<ripple::uint256> hashes;
do
{
hashes.push_back(result.getUInt256());
} while (result.nextRow());
BOOST_LOG_TRIVIAL(debug)
<< "Fetched " << hashes.size()
<< " transaction hashes from Cassandra in "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
.count()
<< " milliseconds";
return hashes;
}
struct ReadDiffCallbackData
{
CassandraBackend const& backend;
uint32_t sequence;
std::vector<LedgerObject>& result;
std::condition_variable& cv;
std::atomic_uint32_t& numFinished;
size_t batchSize;
ReadDiffCallbackData(
CassandraBackend const& backend,
uint32_t sequence,
std::vector<LedgerObject>& result,
std::condition_variable& cv,
std::atomic_uint32_t& numFinished,
size_t batchSize)
: backend(backend)
, sequence(sequence)
, result(result)
, cv(cv)
, numFinished(numFinished)
, batchSize(batchSize)
{
}
};
void
flatMapReadDiffCallback(CassFuture* fut, void* cbData);
void
readDiff(ReadDiffCallbackData& data)
{
CassandraStatement statement{
data.backend.getSelectLedgerDiffPreparedStatement()};
statement.bindInt(data.sequence);
data.backend.executeAsyncRead(statement, flatMapReadDiffCallback, data);
}
// Process the result of an asynchronous read. Retry on error
// @param fut cassandra future associated with the read
// @param cbData struct that holds the request parameters
void
flatMapReadDiffCallback(CassFuture* fut, void* cbData)
{
ReadDiffCallbackData& requestParams =
*static_cast<ReadDiffCallbackData*>(cbData);
auto func = [](auto& params) { readDiff(params); };
CassandraAsyncResult asyncResult{requestParams, fut, func, true};
CassandraResult& result = asyncResult.getResult();
if (!!result)
{
do
{
requestParams.result.push_back(
{result.getUInt256(), result.getBytes()});
} while (result.nextRow());
}
}
std::map<uint32_t, std::vector<LedgerObject>>
CassandraBackend::fetchLedgerDiffs(std::vector<uint32_t> const& sequences) const
{
std::atomic_uint32_t numFinished = 0;
std::condition_variable cv;
std::mutex mtx;
std::map<uint32_t, std::vector<LedgerObject>> results;
std::vector<std::shared_ptr<ReadDiffCallbackData>> cbs;
cbs.reserve(sequences.size());
for (std::size_t i = 0; i < sequences.size(); ++i)
{
cbs.push_back(std::make_shared<ReadDiffCallbackData>(
*this,
sequences[i],
results[sequences[i]],
cv,
numFinished,
sequences.size()));
readDiff(*cbs[i]);
}
assert(results.size() == cbs.size());
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numFinished, &sequences]() {
return numFinished == sequences.size();
});
return results;
}
std::vector<LedgerObject>
CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const
{
CassandraStatement statement{selectLedgerDiff_};
statement.bindInt(ledgerSequence);
auto start = std::chrono::system_clock::now();
CassandraResult result = executeSyncRead(statement);
auto mid = std::chrono::system_clock::now();
if (!result)
return {};
std::vector<LedgerObject> objects;
do
{
objects.push_back({result.getUInt256(), result.getBytes()});
} while (result.nextRow());
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Fetched diff. Fetch time = "
<< std::to_string((mid - start).count() / 1000000000.0)
<< " . total time = "
<< std::to_string((end - start).count() / 1000000000.0);
return objects;
}
LedgerPage
CassandraBackend::doFetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const
{
auto index = getKeyIndexOfSeq(ledgerSequence);
if (!index)
return {};
LedgerPage page;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " ledgerSequence = " << std::to_string(ledgerSequence)
<< " index = " << std::to_string(index->keyIndex);
if (cursor)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - Cursor = " << ripple::strHex(*cursor);
CassandraStatement statement{selectKeys_};
statement.bindInt(index->keyIndex);
if (cursor)
statement.bindBytes(*cursor);
else
{
ripple::uint256 zero;
statement.bindBytes(zero);
}
statement.bindUInt(limit + 1);
CassandraResult result = executeSyncRead(statement);
if (!!result)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - got keys - size = " << result.numRows();
std::vector<ripple::uint256> keys;
do
{
keys.push_back(result.getUInt256());
} while (result.nextRow());
if (keys.size() && keys.size() >= limit)
{
page.cursor = keys.back();
++(*page.cursor);
}
auto objects = fetchLedgerObjects(keys, ledgerSequence);
if (objects.size() != keys.size())
throw std::runtime_error("Mismatch in size of objects and keys");
if (cursor)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Cursor = " << ripple::strHex(*page.cursor);
for (size_t i = 0; i < objects.size(); ++i)
{
auto& obj = objects[i];
auto& key = keys[i];
if (obj.size())
{
page.objects.push_back({std::move(key), std::move(obj)});
}
}
if (!cursor && (!keys.size() || !keys[0].isZero()))
page.warning = "Data may be incomplete";
return page;
}
if (!cursor)
return {{}, {}, "Data may be incomplete"};
return {};
}
std::vector<Blob>
CassandraBackend::fetchLedgerObjects(
std::vector<ripple::uint256> const& keys,
uint32_t sequence) const
{
std::size_t const numKeys = keys.size();
BOOST_LOG_TRIVIAL(trace)
<< "Fetching " << numKeys << " records from Cassandra";
std::atomic_uint32_t numFinished = 0;
std::condition_variable cv;
std::mutex mtx;
std::vector<Blob> results{numKeys};
std::vector<std::shared_ptr<ReadObjectCallbackData>> cbs;
cbs.reserve(numKeys);
for (std::size_t i = 0; i < keys.size(); ++i)
{
cbs.push_back(std::make_shared<ReadObjectCallbackData>(
*this, keys[i], sequence, results[i], cv, numFinished, numKeys));
readObject(*cbs[i]);
}
assert(results.size() == cbs.size());
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numFinished, &numKeys]() { return numFinished == numKeys; });
for (auto const& res : results)
{
if (res.size() == 1 && res[0] == 0)
throw DatabaseTimeout();
}
BOOST_LOG_TRIVIAL(trace)
<< "Fetched " << numKeys << " records from Cassandra";
return results;
}
struct WriteBookCallbackData
{
CassandraBackend const& backend;
ripple::uint256 book;
ripple::uint256 offerKey;
uint32_t ledgerSequence;
std::condition_variable& cv;
std::atomic_uint32_t& numOutstanding;
std::mutex& mtx;
uint32_t currentRetries = 0;
WriteBookCallbackData(
CassandraBackend const& backend,
ripple::uint256 const& book,
ripple::uint256 const& offerKey,
uint32_t ledgerSequence,
std::condition_variable& cv,
std::mutex& mtx,
std::atomic_uint32_t& numOutstanding)
: backend(backend)
, book(book)
, offerKey(offerKey)
, ledgerSequence(ledgerSequence)
, cv(cv)
, mtx(mtx)
, numOutstanding(numOutstanding)
{
}
};
void
writeBookCallback(CassFuture* fut, void* cbData);
void
writeBook(WriteBookCallbackData& cb)
{
CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()};
statement.bindBytes(cb.book.data(), 24);
statement.bindInt(cb.ledgerSequence);
statement.bindBytes(cb.book.data() + 24, 8);
statement.bindBytes(cb.offerKey);
// Passing isRetry as true bypasses incrementing numOutstanding
cb.backend.executeAsyncWrite(statement, writeBookCallback, cb, true);
}
void
writeBookCallback(CassFuture* fut, void* cbData)
{
WriteBookCallbackData& requestParams =
*static_cast<WriteBookCallbackData*>(cbData);
CassandraBackend const& backend = requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert book error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying in " << wait.count()
<< " milliseconds";
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.getIOContext(),
std::chrono::steady_clock::now() + wait);
timer->async_wait(
[timer, &requestParams](const boost::system::error_code& error) {
writeBook(requestParams);
});
}
else
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book";
{
std::lock_guard lck(requestParams.mtx);
--requestParams.numOutstanding;
requestParams.cv.notify_one();
}
}
}
struct WriteKeyCallbackData
{
CassandraBackend const& backend;
ripple::uint256 key;
uint32_t ledgerSequence;
std::condition_variable& cv;
std::atomic_uint32_t& numRemaining;
std::mutex& mtx;
uint32_t currentRetries = 0;
WriteKeyCallbackData(
CassandraBackend const& backend,
ripple::uint256 const& key,
uint32_t ledgerSequence,
std::condition_variable& cv,
std::mutex& mtx,
std::atomic_uint32_t& numRemaining)
: backend(backend)
, key(key)
, ledgerSequence(ledgerSequence)
, cv(cv)
, mtx(mtx)
, numRemaining(numRemaining)
{
}
};
struct OnlineDeleteCallbackData
{
CassandraBackend const& backend;
ripple::uint256 key;
uint32_t ledgerSequence;
std::vector<unsigned char> object;
std::condition_variable& cv;
std::atomic_uint32_t& numOutstanding;
std::mutex& mtx;
uint32_t currentRetries = 0;
OnlineDeleteCallbackData(
CassandraBackend const& backend,
ripple::uint256&& key,
uint32_t ledgerSequence,
std::vector<unsigned char>&& object,
std::condition_variable& cv,
std::mutex& mtx,
std::atomic_uint32_t& numOutstanding)
: backend(backend)
, key(std::move(key))
, ledgerSequence(ledgerSequence)
, object(std::move(object))
, cv(cv)
, mtx(mtx)
, numOutstanding(numOutstanding)
{
}
};
void
onlineDeleteCallback(CassFuture* fut, void* cbData);
void
onlineDelete(OnlineDeleteCallbackData& cb)
{
{
CassandraStatement statement{
cb.backend.getInsertObjectPreparedStatement()};
statement.bindBytes(cb.key);
statement.bindInt(cb.ledgerSequence);
statement.bindBytes(cb.object);
cb.backend.executeAsyncWrite(statement, onlineDeleteCallback, cb, true);
}
}
void
onlineDeleteCallback(CassFuture* fut, void* cbData)
{
OnlineDeleteCallbackData& requestParams =
*static_cast<OnlineDeleteCallbackData*>(cbData);
CassandraBackend const& backend = requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert book error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying in " << wait.count()
<< " milliseconds";
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.getIOContext(),
std::chrono::steady_clock::now() + wait);
timer->async_wait(
[timer, &requestParams](const boost::system::error_code& error) {
onlineDelete(requestParams);
});
}
else
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book";
{
std::lock_guard lck(requestParams.mtx);
--requestParams.numOutstanding;
requestParams.cv.notify_one();
}
}
}
void
writeKeyCallback(CassFuture* fut, void* cbData);
void
writeKey(WriteKeyCallbackData& cb)
{
CassandraStatement statement{cb.backend.getInsertKeyPreparedStatement()};
statement.bindInt(cb.ledgerSequence);
statement.bindBytes(cb.key);
// Passing isRetry as true bypasses incrementing numOutstanding
cb.backend.executeAsyncWrite(statement, writeKeyCallback, cb, true);
}
void
writeKeyCallback(CassFuture* fut, void* cbData)
{
WriteKeyCallbackData& requestParams =
*static_cast<WriteKeyCallbackData*>(cbData);
CassandraBackend const& backend = requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert key error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying in " << wait.count()
<< " milliseconds";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.getIOContext(),
std::chrono::steady_clock::now() + wait);
timer->async_wait(
[timer, &requestParams](const boost::system::error_code& error) {
writeKey(requestParams);
});
}
else
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a key";
{
std::lock_guard lck(requestParams.mtx);
--requestParams.numRemaining;
requestParams.cv.notify_one();
}
}
}
bool
CassandraBackend::writeKeys(
std::unordered_set<ripple::uint256> const& keys,
KeyIndex const& index,
bool isAsync) const
{
auto bind = [this](auto& params) {
auto& [lgrSeq, key] = params.data;
CassandraStatement statement{insertKey_};
statement.bindInt(lgrSeq);
statement.bindBytes(key);
return statement;
};
std::atomic_int numRemaining = keys.size();
std::condition_variable cv;
std::mutex mtx;
std::vector<std::shared_ptr<BulkWriteCallbackData<
std::pair<uint32_t, ripple::uint256>,
typename std::remove_reference<decltype(bind)>::type>>>
cbs;
cbs.reserve(keys.size());
uint32_t concurrentLimit =
isAsync ? indexerMaxRequestsOutstanding : maxRequestsOutstanding;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Ledger = " << std::to_string(index.keyIndex)
<< " . num keys = " << std::to_string(keys.size())
<< " . concurrentLimit = "
<< std::to_string(indexerMaxRequestsOutstanding);
uint32_t numSubmitted = 0;
for (auto& key : keys)
{
cbs.push_back(makeAndExecuteBulkAsyncWrite(
this,
std::make_pair(index.keyIndex, std::move(key)),
bind,
numRemaining,
mtx,
cv));
++numSubmitted;
BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request";
std::unique_lock<std::mutex> lck(mtx);
BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex";
cv.wait(lck, [&numRemaining, numSubmitted, concurrentLimit, &keys]() {
BOOST_LOG_TRIVIAL(trace) << std::to_string(numSubmitted) << " "
<< std::to_string(numRemaining) << " "
<< std::to_string(keys.size()) << " "
<< std::to_string(concurrentLimit);
// keys.size() - i is number submitted. keys.size() -
// numRemaining is number completed Difference is num
// outstanding
return (numSubmitted - (keys.size() - numRemaining)) <
concurrentLimit;
});
if (numSubmitted % 100000 == 0)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Submitted " << std::to_string(numSubmitted)
<< " write requests. Completed "
<< (keys.size() - numRemaining);
}
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numRemaining]() { return numRemaining == 0; });
return true;
}
bool
CassandraBackend::isIndexed(uint32_t ledgerSequence) const
{
return false;
/*
auto rng = fetchLedgerRange();
if (!rng)
return false;
if (ledgerSequence != rng->minSequence &&
ledgerSequence != (ledgerSequence >> indexerShift_ << indexerShift_))
ledgerSequence = ((ledgerSequence >> indexerShift_) << indexerShift_) +
(1 << indexerShift_);
CassandraStatement statement{selectKeys_};
statement.bindInt(ledgerSequence);
ripple::uint256 zero;
statement.bindBytes(zero);
statement.bindUInt(1);
CassandraResult result = executeSyncRead(statement);
return !!result;
*/
}
std::optional<uint32_t>
CassandraBackend::getNextToIndex() const
{
return {};
/*
auto rng = fetchLedgerRange();
if (!rng)
return {};
uint32_t cur = rng->minSequence;
while (isIndexed(cur))
{
cur = ((cur >> indexerShift_) << indexerShift_) + (1 << indexerShift_);
}
return cur;
*/
}
bool
CassandraBackend::runIndexer(uint32_t ledgerSequence) const
{
return false;
/*
auto start = std::chrono::system_clock::now();
constexpr uint32_t limit = 2048;
std::unordered_set<ripple::uint256> keys;
std::unordered_map<ripple::uint256, ripple::uint256> offers;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
books;
std::optional<ripple::uint256> cursor;
size_t numOffers = 0;
uint32_t base = ledgerSequence;
auto rng = fetchLedgerRange();
if (base != rng->minSequence)
{
base = (base >> indexerShift_) << indexerShift_;
base -= (1 << indexerShift_);
if (base < rng->minSequence)
base = rng->minSequence;
}
BOOST_LOG_TRIVIAL(info)
<< __func__ << " base = " << std::to_string(base)
<< " next to index = " << std::to_string(ledgerSequence);
while (true)
{
try
{
auto [objects, curCursor] = fetchLedgerPage(cursor, base, limit);
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
cursor = curCursor;
for (auto& obj : objects)
{
if (isOffer(obj.blob))
{
auto bookDir = getBook(obj.blob);
books[bookDir].insert(obj.key);
offers[obj.key] = bookDir;
++numOffers;
}
keys.insert(std::move(obj.key));
if (keys.size() % 100000 == 0)
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Fetched "
<< std::to_string(keys.size()) << "keys";
}
if (!cursor)
break;
}
catch (DatabaseTimeout const& e)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Database timeout fetching keys";
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
auto mid = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Fetched all keys from ledger " << std::to_string(base)
<< " . num keys = " << keys.size() << " num books = " << books.size()
<< " num offers = " << numOffers << " . Took "
<< (mid - start).count() / 1000000000.0;
if (base == ledgerSequence)
{
BOOST_LOG_TRIVIAL(info) << __func__ << "Writing keys";
writeKeys(keys, ledgerSequence);
writeBooks(books, ledgerSequence, numOffers);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Wrote all keys from ledger "
<< std::to_string(ledgerSequence) << " . num keys = " << keys.size()
<< " . Took " << (end - mid).count() / 1000000000.0
<< ". Entire operation took "
<< (end - start).count() / 1000000000.0;
}
else
{
writeBooks(books, base, numOffers);
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Wrote books. Skipping writing keys";
}
uint32_t prevLedgerSequence = base;
uint32_t nextLedgerSequence =
((prevLedgerSequence >> indexerShift_) << indexerShift_);
BOOST_LOG_TRIVIAL(info)
<< __func__ << " next base = " << std::to_string(nextLedgerSequence);
nextLedgerSequence += (1 << indexerShift_);
BOOST_LOG_TRIVIAL(info)
<< __func__ << " next = " << std::to_string(nextLedgerSequence);
while (true)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Processing diffs. nextLedger = "
<< std::to_string(nextLedgerSequence);
auto rng = fetchLedgerRange();
if (rng->maxSequence < nextLedgerSequence)
break;
start = std::chrono::system_clock::now();
for (size_t i = prevLedgerSequence; i <= nextLedgerSequence; i += 256)
{
auto start2 = std::chrono::system_clock::now();
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>>
booksDeleted;
size_t numOffersDeleted = 0;
// Get the diff and update keys
std::vector<LedgerObject> objs;
std::vector<uint32_t> sequences(256, 0);
std::iota(sequences.begin(), sequences.end(), i + 1);
auto diffs = fetchLedgerDiffs(sequences);
for (auto const& diff : diffs)
{
for (auto const& obj : diff.second)
{
// remove deleted keys
if (obj.blob.size() == 0)
{
keys.erase(obj.key);
if (offers.count(obj.key) > 0)
{
auto book = offers[obj.key];
if (booksDeleted[book].insert(obj.key).second)
++numOffersDeleted;
offers.erase(obj.key);
}
}
else
{
// insert other keys. keys is a set, so this is a
// noop if obj.key is already in keys
keys.insert(obj.key);
// if the object is an offer, add to books
if (isOffer(obj.blob))
{
auto book = getBook(obj.blob);
if (books[book].insert(obj.key).second)
++numOffers;
offers[obj.key] = book;
}
}
}
}
if (sequences.back() % 256 != 0)
{
BOOST_LOG_TRIVIAL(error)
<< __func__
<< " back : " << std::to_string(sequences.back())
<< " front : " << std::to_string(sequences.front())
<< " size : " << std::to_string(sequences.size());
throw std::runtime_error(
"Last sequence is not divisible by 256");
}
for (auto& book : booksDeleted)
{
for (auto& offerKey : book.second)
{
if (books[book.first].erase(offerKey))
--numOffers;
}
}
writeBooks(books, sequences.back(), numOffers);
writeBooks(booksDeleted, sequences.back(), numOffersDeleted);
auto mid = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info) << __func__ << " Fetched 256 diffs. Took "
<< (mid - start2).count() / 1000000000.0;
}
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Fetched all from diffs "
<< std::to_string(nextLedgerSequence)
<< " shift width = " << std::to_string(indexerShift_)
<< ". num keys = " << keys.size() << " . Took "
<< (end - start).count() / 1000000000.0
<< " prev ledger = " << std::to_string(prevLedgerSequence);
writeKeys(keys, nextLedgerSequence);
prevLedgerSequence = nextLedgerSequence;
nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_);
}
return true;
*/
}
bool
CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
{
// calculate TTL
// ledgers close roughly every 4 seconds. We double the TTL so that way
// there is a window of time to update the database, to prevent unchanging
// records from being deleted.
auto rng = fetchLedgerRangeNoThrow();
if (!rng)
return false;
uint32_t minLedger = rng->maxSequence - numLedgersToKeep;
if (minLedger <= rng->minSequence)
return false;
std::condition_variable cv;
std::mutex mtx;
std::vector<std::shared_ptr<OnlineDeleteCallbackData>> cbs;
uint32_t concurrentLimit = 10;
std::atomic_uint32_t numOutstanding = 0;
// iterate through latest ledger, updating TTL
std::optional<ripple::uint256> cursor;
while (true)
{
try
{
auto [objects, curCursor, warning] =
fetchLedgerPage(cursor, minLedger, 256);
if (warning)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__
<< " online delete running but flag ledger is not complete";
std::this_thread::sleep_for(std::chrono::seconds(10));
continue;
}
for (auto& obj : objects)
{
++numOutstanding;
cbs.push_back(std::make_shared<OnlineDeleteCallbackData>(
*this,
std::move(obj.key),
minLedger,
std::move(obj.blob),
cv,
mtx,
numOutstanding));
onlineDelete(*cbs.back());
std::unique_lock<std::mutex> lck(mtx);
BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex";
cv.wait(lck, [&numOutstanding, concurrentLimit]() {
return numOutstanding < concurrentLimit;
});
}
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
cursor = curCursor;
if (!cursor)
break;
}
catch (DatabaseTimeout const& e)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Database timeout fetching keys";
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; });
CassandraStatement statement{deleteLedgerRange_};
statement.bindInt(minLedger);
executeSyncWrite(statement);
// update ledger_range
return true;
}
void
CassandraBackend::open(bool readOnly)
{
auto getString = [this](std::string const& field) -> std::string {
if (config_.contains(field))
{
auto jsonStr = config_[field].as_string();
return {jsonStr.c_str(), jsonStr.size()};
}
return {""};
};
auto getInt = [this](std::string const& field) -> std::optional<int> {
if (config_.contains(field) && config_.at(field).is_int64())
return config_[field].as_int64();
return {};
};
if (open_)
{
assert(false);
BOOST_LOG_TRIVIAL(error) << "database is already open";
return;
}
BOOST_LOG_TRIVIAL(info) << "Opening Cassandra Backend";
std::lock_guard<std::mutex> lock(mutex_);
CassCluster* cluster = cass_cluster_new();
if (!cluster)
throw std::runtime_error("nodestore:: Failed to create CassCluster");
std::string secureConnectBundle = getString("secure_connect_bundle");
if (!secureConnectBundle.empty())
{
/* Setup driver to connect to the cloud using the secure connection
* bundle */
if (cass_cluster_set_cloud_secure_connection_bundle(
cluster, secureConnectBundle.c_str()) != CASS_OK)
{
BOOST_LOG_TRIVIAL(error) << "Unable to configure cloud using the "
"secure connection bundle: "
<< secureConnectBundle;
throw std::runtime_error(
"nodestore: Failed to connect using secure connection "
"bundle");
return;
}
}
else
{
std::string contact_points = getString("contact_points");
if (contact_points.empty())
{
throw std::runtime_error(
"nodestore: Missing contact_points in Cassandra config");
}
CassError rc =
cass_cluster_set_contact_points(cluster, contact_points.c_str());
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "nodestore: Error setting Cassandra contact_points: "
<< contact_points << ", result: " << rc << ", "
<< cass_error_desc(rc);
throw std::runtime_error(ss.str());
}
auto port = getInt("port");
if (port)
{
rc = cass_cluster_set_port(cluster, *port);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "nodestore: Error setting Cassandra port: " << *port
<< ", result: " << rc << ", " << cass_error_desc(rc);
throw std::runtime_error(ss.str());
}
}
}
cass_cluster_set_token_aware_routing(cluster, cass_true);
CassError rc =
cass_cluster_set_protocol_version(cluster, CASS_PROTOCOL_VERSION_V4);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "nodestore: Error setting cassandra protocol version: "
<< ", result: " << rc << ", " << cass_error_desc(rc);
throw std::runtime_error(ss.str());
}
std::string username = getString("username");
if (username.size())
{
BOOST_LOG_TRIVIAL(debug)
<< "user = " << username.c_str()
<< " password = " << getString("password").c_str();
cass_cluster_set_credentials(
cluster, username.c_str(), getString("password").c_str());
}
int threads = getInt("threads") ? *getInt("threads")
: std::thread::hardware_concurrency();
rc = cass_cluster_set_num_threads_io(cluster, threads);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "nodestore: Error setting Cassandra io threads to " << threads
<< ", result: " << rc << ", " << cass_error_desc(rc);
throw std::runtime_error(ss.str());
}
if (getInt("max_requests_outstanding"))
maxRequestsOutstanding = *getInt("max_requests_outstanding");
cass_cluster_set_request_timeout(cluster, 10000);
rc = cass_cluster_set_queue_size_io(
cluster,
maxRequestsOutstanding); // This number needs to scale w/ the
// number of request per sec
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "nodestore: Error setting Cassandra max core connections per "
"host"
<< ", result: " << rc << ", " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
throw std::runtime_error(ss.str());
}
std::string certfile = getString("certfile");
if (certfile.size())
{
std::ifstream fileStream(
boost::filesystem::path(certfile).string(), std::ios::in);
if (!fileStream)
{
std::stringstream ss;
ss << "opening config file " << certfile;
throw std::system_error(errno, std::generic_category(), ss.str());
}
std::string cert(
std::istreambuf_iterator<char>{fileStream},
std::istreambuf_iterator<char>{});
if (fileStream.bad())
{
std::stringstream ss;
ss << "reading config file " << certfile;
throw std::system_error(errno, std::generic_category(), ss.str());
}
CassSsl* context = cass_ssl_new();
cass_ssl_set_verify_flags(context, CASS_SSL_VERIFY_NONE);
rc = cass_ssl_add_trusted_cert(context, cert.c_str());
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "nodestore: Error setting Cassandra ssl context: " << rc
<< ", " << cass_error_desc(rc);
throw std::runtime_error(ss.str());
}
cass_cluster_set_ssl(cluster, context);
cass_ssl_free(context);
}
std::string keyspace = getString("keyspace");
if (keyspace.empty())
{
BOOST_LOG_TRIVIAL(warning)
<< "No keyspace specified. Using keyspace oceand";
keyspace = "oceand";
}
int rf = getInt("replication_factor") ? *getInt("replication_factor") : 3;
std::string tablePrefix = getString("table_prefix");
if (tablePrefix.empty())
{
BOOST_LOG_TRIVIAL(warning) << "Table prefix is empty";
}
cass_cluster_set_connect_timeout(cluster, 10000);
int ttl = getInt("ttl") ? *getInt("ttl") * 2 : 0;
int keysTtl = (ttl != 0 ? pow(2, indexer_.getKeyShift()) * 4 * 2 : 0);
int incr = keysTtl;
while (keysTtl < ttl)
{
keysTtl += incr;
}
int booksTtl = 0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " setting ttl to " << std::to_string(ttl)
<< " , books ttl to " << std::to_string(booksTtl) << " , keys ttl to "
<< std::to_string(keysTtl);
auto executeSimpleStatement = [this](std::string const& query) {
CassStatement* statement = makeStatement(query.c_str(), 0);
CassFuture* fut = cass_session_execute(session_.get(), statement);
CassError rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error executing simple statement: " << rc << ", "
<< cass_error_desc(rc) << " - " << query;
BOOST_LOG_TRIVIAL(error) << ss.str();
return false;
}
return true;
};
CassStatement* statement;
CassFuture* fut;
bool setupSessionAndTable = false;
while (!setupSessionAndTable)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
session_.reset(cass_session_new());
assert(session_);
fut = cass_session_connect_keyspace(
session_.get(), cluster, keyspace.c_str());
rc = cass_future_error_code(fut);
cass_future_free(fut);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "nodestore: Error connecting Cassandra session keyspace: "
<< rc << ", " << cass_error_desc(rc)
<< ", trying to create it ourselves";
BOOST_LOG_TRIVIAL(error) << ss.str();
// if the keyspace doesn't exist, try to create it
session_.reset(cass_session_new());
fut = cass_session_connect(session_.get(), cluster);
rc = cass_future_error_code(fut);
cass_future_free(fut);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "nodestore: Error connecting Cassandra session at all: "
<< rc << ", " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
}
else
{
std::stringstream query;
query << "CREATE KEYSPACE IF NOT EXISTS " << keyspace
<< " WITH replication = {'class': 'SimpleStrategy', "
"'replication_factor': '"
<< std::to_string(rf) << "'} AND durable_writes = true";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "USE " << keyspace;
if (!executeSimpleStatement(query.str()))
continue;
}
continue;
}
std::stringstream query;
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "objects"
<< " ( key blob, sequence bigint, object blob, PRIMARY "
"KEY(key, "
"sequence)) WITH CLUSTERING ORDER BY (sequence DESC) AND"
<< " default_time_to_live = " << std::to_string(ttl);
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "SELECT * FROM " << tablePrefix << "objects"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions"
<< " ( hash blob PRIMARY KEY, ledger_sequence bigint, "
"transaction "
"blob, metadata blob)"
<< " WITH default_time_to_live = " << std::to_string(ttl);
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix
<< "ledger_transactions"
<< " ( ledger_sequence bigint, hash blob, PRIMARY "
"KEY(ledger_sequence, hash))"
<< " WITH default_time_to_live = " << std::to_string(ttl);
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "SELECT * FROM " << tablePrefix << "transactions"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "CREATE INDEX ON " << tablePrefix
<< "transactions(ledger_sequence)";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "SELECT * FROM " << tablePrefix
<< "transactions WHERE ledger_sequence = 1"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "keys"
<< " ( sequence bigint, key blob, PRIMARY KEY "
"(sequence, key))"
" WITH default_time_to_live = "
<< std::to_string(keysTtl);
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "SELECT * FROM " << tablePrefix << "keys"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx"
<< " ( account blob, seq_idx tuple<bigint, bigint>, "
" hash blob, "
"PRIMARY KEY "
"(account, seq_idx)) WITH "
"CLUSTERING ORDER BY (seq_idx desc)"
<< " AND default_time_to_live = " << std::to_string(ttl);
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "SELECT * FROM " << tablePrefix << "account_tx"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledgers"
<< " ( sequence bigint PRIMARY KEY, header blob )"
<< " WITH default_time_to_live = " << std::to_string(ttl);
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "SELECT * FROM " << tablePrefix << "ledgers"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_hashes"
<< " (hash blob PRIMARY KEY, sequence bigint)"
<< " WITH default_time_to_live = " << std::to_string(ttl);
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "SELECT * FROM " << tablePrefix << "ledger_hashes"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_range"
<< " (is_latest boolean PRIMARY KEY, sequence bigint)";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "SELECT * FROM " << tablePrefix << "ledger_range"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
setupSessionAndTable = true;
}
cass_cluster_free(cluster);
bool setupPreparedStatements = false;
while (!setupPreparedStatements)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
std::stringstream query;
query << "INSERT INTO " << tablePrefix << "objects"
<< " (key, sequence, object) VALUES (?, ?, ?)";
if (!insertObject_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << "INSERT INTO " << tablePrefix << "transactions"
<< " (hash, ledger_sequence, transaction, metadata) VALUES "
"(?, ?, "
"?, ?)";
if (!insertTransaction_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << "INSERT INTO " << tablePrefix << "ledger_transactions"
<< " (ledger_sequence, hash) VALUES "
"(?, ?)";
if (!insertLedgerTransaction_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << "INSERT INTO " << tablePrefix << "keys"
<< " (sequence, key) VALUES (?, ?)";
if (!insertKey_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << "SELECT key FROM " << tablePrefix << "keys"
<< " WHERE sequence = ? AND key >= ? ORDER BY key ASC LIMIT ?";
if (!selectKeys_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << "SELECT object, sequence FROM " << tablePrefix << "objects"
<< " WHERE key = ? AND sequence <= ? ORDER BY sequence DESC "
"LIMIT 1";
if (!selectObject_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << "SELECT transaction, metadata, ledger_sequence FROM "
<< tablePrefix << "transactions"
<< " WHERE hash = ?";
if (!selectTransaction_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << "SELECT hash FROM " << tablePrefix << "ledger_transactions"
<< " WHERE ledger_sequence = ?";
if (!selectAllTransactionHashesInLedger_.prepareStatement(
query, session_.get()))
continue;
query.str("");
query << "SELECT key FROM " << tablePrefix << "objects "
<< " WHERE TOKEN(key) >= ? and sequence <= ? "
<< " PER PARTITION LIMIT 1 LIMIT ?"
<< " ALLOW FILTERING";
if (!selectLedgerPageKeys_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << "SELECT object,key FROM " << tablePrefix << "objects "
<< " WHERE TOKEN(key) >= ? and sequence <= ? "
<< " PER PARTITION LIMIT 1 LIMIT ? ALLOW FILTERING";
if (!selectLedgerPage_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << "SELECT TOKEN(key) FROM " << tablePrefix << "objects "
<< " WHERE key = ? LIMIT 1";
if (!getToken_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << " INSERT INTO " << tablePrefix << "account_tx"
<< " (account, seq_idx, hash) "
<< " VALUES (?,?,?)";
if (!insertAccountTx_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << " SELECT hash,seq_idx FROM " << tablePrefix << "account_tx"
<< " WHERE account = ? "
<< " AND seq_idx < ? LIMIT ?";
if (!selectAccountTx_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << " INSERT INTO " << tablePrefix << "ledgers "
<< " (sequence, header) VALUES(?,?)";
if (!insertLedgerHeader_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << " INSERT INTO " << tablePrefix << "ledger_hashes"
<< " (hash, sequence) VALUES(?,?)";
if (!insertLedgerHash_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << " update " << tablePrefix << "ledger_range"
<< " set sequence = ? where is_latest = ? if sequence in "
"(?,null)";
if (!updateLedgerRange_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << " update " << tablePrefix << "ledger_range"
<< " set sequence = ? where is_latest = false";
if (!deleteLedgerRange_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << " select header from " << tablePrefix
<< "ledgers where sequence = ?";
if (!selectLedgerBySeq_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << " select sequence from " << tablePrefix
<< "ledger_range where is_latest = true";
if (!selectLatestLedger_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << " SELECT sequence FROM " << tablePrefix
<< "ledger_range WHERE "
<< " is_latest IN (true, false)";
if (!selectLedgerRange_.prepareStatement(query, session_.get()))
continue;
/*
query.str("");
query << " SELECT key,object FROM " << tablePrefix
<< "objects WHERE sequence = ?";
if (!selectLedgerDiff_.prepareStatement(query, session_.get()))
continue;
*/
setupPreparedStatements = true;
}
work_.emplace(ioContext_);
ioThread_ = std::thread{[this]() { ioContext_.run(); }};
open_ = true;
BOOST_LOG_TRIVIAL(info) << "Opened CassandraBackend successfully";
} // namespace Backend
} // namespace Backend