This commit is contained in:
CJ Cobb
2021-07-13 17:06:15 +00:00
parent 599bd1b655
commit 1cdd238ad3
4 changed files with 32 additions and 5 deletions

View File

@@ -209,7 +209,6 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend)
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << __func__
<< " starting. sequence = " << std::to_string(ledgerSequence); << " starting. sequence = " << std::to_string(ledgerSequence);
bool isFirst = false;
auto keyIndex = getKeyIndexOfSeq(ledgerSequence); auto keyIndex = getKeyIndexOfSeq(ledgerSequence);
if (isFirst_) if (isFirst_)
{ {

View File

@@ -7,6 +7,7 @@ template <class T, class F>
void void
processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func) processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func)
{ {
BOOST_LOG_TRIVIAL(debug) << __func__ << " Processing async write response";
CassandraBackend const& backend = *requestParams.backend; CassandraBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut); auto rc = cass_future_error_code(fut);
if (rc != CASS_OK) if (rc != CASS_OK)
@@ -39,6 +40,7 @@ template <class T>
void void
processAsyncWrite(CassFuture* fut, void* cbData) processAsyncWrite(CassFuture* fut, void* cbData)
{ {
BOOST_LOG_TRIVIAL(debug) << __func__ << " processing async write";
T& requestParams = *static_cast<T*>(cbData); T& requestParams = *static_cast<T*>(cbData);
// TODO don't pass in func // TODO don't pass in func
processAsyncWriteResponse(requestParams, fut, requestParams.retry); processAsyncWriteResponse(requestParams, fut, requestParams.retry);
@@ -56,6 +58,7 @@ struct WriteCallbackData
WriteCallbackData(CassandraBackend const* b, T&& d, B bind) WriteCallbackData(CassandraBackend const* b, T&& d, B bind)
: backend(b), data(std::move(d)) : backend(b), data(std::move(d))
{ {
BOOST_LOG_TRIVIAL(debug) << "Making WriteCallbackData";
retry = [bind, this](auto& params, bool isRetry) { retry = [bind, this](auto& params, bool isRetry) {
auto statement = bind(params); auto statement = bind(params);
backend->executeAsyncWrite( backend->executeAsyncWrite(
@@ -69,7 +72,10 @@ struct WriteCallbackData
virtual void virtual void
start() start()
{ {
BOOST_LOG_TRIVIAL(debug) << "Starting";
BOOST_LOG_TRIVIAL(debug) << "address is " << this;
retry(*this, false); retry(*this, false);
BOOST_LOG_TRIVIAL(debug) << "Started";
} }
virtual void virtual void
@@ -82,6 +88,7 @@ struct WriteCallbackData
} }
virtual ~WriteCallbackData() virtual ~WriteCallbackData()
{ {
BOOST_LOG_TRIVIAL(debug) << __func__;
} }
}; };
template <class T, class B> template <class T, class B>
@@ -342,8 +349,11 @@ CassandraBackend::fetchTransactions(
statement.bindBytes(hashes[i]); statement.bindBytes(hashes[i]);
cbs.push_back(std::make_shared<ReadCallbackData>( cbs.push_back(std::make_shared<ReadCallbackData>(
numOutstanding, mtx, cv, [i, &results](auto& result) { numOutstanding, mtx, cv, [i, &results](auto& result) {
if (result.hasResult())
results[i] = { results[i] = {
result.getBytes(), result.getBytes(), result.getUInt32()}; result.getBytes(),
result.getBytes(),
result.getUInt32()};
})); }));
executeAsyncRead(statement, processAsyncRead, *cbs[i]); executeAsyncRead(statement, processAsyncRead, *cbs[i]);
} }
@@ -480,6 +490,7 @@ CassandraBackend::fetchLedgerObjects(
{ {
cbs.push_back(std::make_shared<ReadCallbackData>( cbs.push_back(std::make_shared<ReadCallbackData>(
numOutstanding, mtx, cv, [i, &results](auto& result) { numOutstanding, mtx, cv, [i, &results](auto& result) {
if (result.hasResult())
results[i] = result.getBytes(); results[i] = result.getBytes();
})); }));
CassandraStatement statement{selectObject_}; CassandraStatement statement{selectObject_};
@@ -515,7 +526,7 @@ CassandraBackend::writeKeys(
statement.bindBytes(key); statement.bindBytes(key);
return statement; return statement;
}; };
std::atomic_int numOutstanding = keys.size(); std::atomic_int numOutstanding = 0;
std::condition_variable cv; std::condition_variable cv;
std::mutex mtx; std::mutex mtx;
std::vector<std::shared_ptr<BulkWriteCallbackData< std::vector<std::shared_ptr<BulkWriteCallbackData<

View File

@@ -108,6 +108,15 @@ public:
cass_statement_set_consistency(statement_, CASS_CONSISTENCY_QUORUM); cass_statement_set_consistency(statement_, CASS_CONSISTENCY_QUORUM);
} }
CassandraStatement(CassandraStatement&& other)
{
statement_ = other.statement_;
other.statement_ = nullptr;
curBindingIndex_ = other.curBindingIndex_;
other.curBindingIndex_ = 0;
}
CassandraStatement(CassandraStatement const& other) = delete;
CassStatement* CassStatement*
get() const get() const
{ {
@@ -950,11 +959,15 @@ public:
T callback, T callback,
S& callbackData) const S& callbackData) const
{ {
BOOST_LOG_TRIVIAL(debug) << "Executing";
BOOST_LOG_TRIVIAL(debug) << "address = " << &callbackData;
CassFuture* fut = cass_session_execute(session_.get(), statement.get()); CassFuture* fut = cass_session_execute(session_.get(), statement.get());
cass_future_set_callback( cass_future_set_callback(
fut, callback, static_cast<void*>(&callbackData)); fut, callback, static_cast<void*>(&callbackData));
cass_future_free(fut); cass_future_free(fut);
BOOST_LOG_TRIVIAL(debug) << "Submitted callback";
} }
template <class T, class S> template <class T, class S>
void void

View File

@@ -114,10 +114,13 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
<< "Deserialized ledger header. " << detail::toString(lgrInfo); << "Deserialized ledger header. " << detail::toString(lgrInfo);
backend_->startWrites(); backend_->startWrites();
BOOST_LOG_TRIVIAL(debug) << __func__ << " started writes";
backend_->writeLedger( backend_->writeLedger(
lgrInfo, std::move(*ledgerData->mutable_ledger_header()), true); lgrInfo, std::move(*ledgerData->mutable_ledger_header()), true);
BOOST_LOG_TRIVIAL(debug) << __func__ << " wrote ledger";
std::vector<AccountTransactionsData> accountTxData = std::vector<AccountTransactionsData> accountTxData =
insertTransactions(lgrInfo, *ledgerData); insertTransactions(lgrInfo, *ledgerData);
BOOST_LOG_TRIVIAL(debug) << __func__ << " inserted txns";
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
@@ -126,6 +129,7 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
// consumes from the queue and inserts the data into the Ledger object. // consumes from the queue and inserts the data into the Ledger object.
// Once the below call returns, all data has been pushed into the queue // Once the below call returns, all data has been pushed into the queue
loadBalancer_->loadInitialLedger(startingSequence); loadBalancer_->loadInitialLedger(startingSequence);
BOOST_LOG_TRIVIAL(debug) << __func__ << " loaded initial ledger";
if (!stopping_) if (!stopping_)
{ {