diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index b1f9481b..615ea048 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -1,5 +1,37 @@ #include namespace Backend { +template +void +processAsyncWriteResponse(T&& requestParams, CassFuture* fut, F func) +{ + CassandraBackend const& backend = *requestParams.backend; + auto rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + BOOST_LOG_TRIVIAL(error) + << "ERROR!!! Cassandra insert error: " << rc << ", " + << cass_error_desc(rc) << ", retrying "; + // exponential backoff with a max wait of 2^10 ms (about 1 second) + auto wait = std::chrono::milliseconds( + lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); + ++requestParams.currentRetries; + std::shared_ptr timer = + std::make_shared( + backend.getIOContext(), + std::chrono::steady_clock::now() + wait); + timer->async_wait([timer, &requestParams, &func]( + const boost::system::error_code& error) { + func(requestParams, true); + }); + } + else + { + backend.finishAsyncWrite(); + int remaining = --requestParams.refs; + if (remaining == 0) + delete &requestParams; + } +} // Process the result of an asynchronous write. Retry on error // @param fut cassandra future associated with the write // @param cbData struct that holds the request parameters @@ -8,72 +40,31 @@ flatMapWriteCallback(CassFuture* fut, void* cbData) { CassandraBackend::WriteCallbackData& requestParams = *static_cast(cbData); - CassandraBackend const& backend = *requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert error: " << rc << ", " - << cass_error_desc(rc) << ", retrying "; - // exponential backoff with a max wait of 2^10 ms (about 1 second) - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.ioContext_, std::chrono::steady_clock::now() + wait); - timer->async_wait([timer, &requestParams, &backend]( - const boost::system::error_code& error) { - backend.write(requestParams, true); - }); - } - else - { - --(backend.numRequestsOutstanding_); + auto func = [&requestParams](auto& params, bool retry) { + requestParams.backend->write(params, retry); + }; - backend.throttleCv_.notify_all(); - if (backend.numRequestsOutstanding_ == 0) - backend.syncCv_.notify_all(); - int remaining = --requestParams.refs; - if (remaining == 0) - delete &requestParams; - } + processAsyncWriteResponse(std::move(requestParams), fut, func); } void flatMapWriteBookCallback(CassFuture* fut, void* cbData) { CassandraBackend::WriteCallbackData& requestParams = *static_cast(cbData); - CassandraBackend const& backend = *requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert error: " << rc << ", " - << cass_error_desc(rc) << ", retrying "; - // exponential backoff with a max wait of 2^10 ms (about 1 second) - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.ioContext_, std::chrono::steady_clock::now() + wait); - timer->async_wait([timer, &requestParams, &backend]( - const boost::system::error_code& error) { - backend.writeBook(requestParams, true); - }); - } - else - { - --(backend.numRequestsOutstanding_); + auto func = [&requestParams](auto& params, bool retry) { + requestParams.backend->writeBook(params, retry); + }; + processAsyncWriteResponse(std::move(requestParams), fut, func); +} - backend.throttleCv_.notify_all(); - if (backend.numRequestsOutstanding_ == 0) - backend.syncCv_.notify_all(); - int remaining = --requestParams.refs; - if (remaining == 0) - delete &requestParams; - } +void +retryWriteKey(CassandraBackend::WriteCallbackData& requestParams, bool isRetry) +{ + auto const& backend = *requestParams.backend; + if (requestParams.isDeleted) + backend.writeDeletedKey(requestParams, true); + else + backend.writeKey(requestParams, true); } void @@ -81,39 +72,7 @@ flatMapWriteKeyCallback(CassFuture* fut, void* cbData) { CassandraBackend::WriteCallbackData& requestParams = *static_cast(cbData); - CassandraBackend const& backend = *requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert error: " << rc << ", " - << cass_error_desc(rc) << ", retrying "; - // exponential backoff with a max wait of 2^10 ms (about 1 second) - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.ioContext_, std::chrono::steady_clock::now() + wait); - timer->async_wait([timer, &requestParams, &backend]( - const boost::system::error_code& error) { - if (requestParams.isDeleted) - backend.writeDeletedKey(requestParams, true); - else - backend.writeKey(requestParams, true); - }); - } - else - { - --(backend.numRequestsOutstanding_); - - backend.throttleCv_.notify_all(); - if (backend.numRequestsOutstanding_ == 0) - backend.syncCv_.notify_all(); - int remaining = --requestParams.refs; - if (remaining == 0) - delete &requestParams; - } + processAsyncWriteResponse(std::move(requestParams), fut, retryWriteKey); } void flatMapGetCreatedCallback(CassFuture* fut, void* cbData) @@ -148,30 +107,16 @@ flatMapGetCreatedCallback(CassFuture* fut, void* cbData) if (backend.numRequestsOutstanding_ == 0) backend.syncCv_.notify_all(); }; - CassResult const* res = cass_future_get_result(fut); + CassandraResult result{cass_future_get_result(fut)}; - CassRow const* row = cass_result_first_row(res); - if (!row) + if (!result) { - cass_result_free(res); BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc << ", " << cass_error_desc(rc); finish(); return; } - cass_int64_t created; - rc = cass_value_get_int64(cass_row_get_column(row, 0), &created); - if (rc != CASS_OK) - { - cass_result_free(res); - BOOST_LOG_TRIVIAL(error) - << "Cassandra fetch get bytes error : " << rc << ", " - << cass_error_desc(rc); - finish(); - return; - } - cass_result_free(res); - requestParams.createdSequence = created; + requestParams.createdSequence = result.getUInt32(); backend.writeDeletedKey(requestParams, false); } } @@ -180,138 +125,41 @@ flatMapWriteTransactionCallback(CassFuture* fut, void* cbData) { CassandraBackend::WriteTransactionCallbackData& requestParams = *static_cast(cbData); - CassandraBackend const& backend = *requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert error: " << rc << ", " - << cass_error_desc(rc) << ", retrying "; - // exponential backoff with a max wait of 2^10 ms (about 1 second) - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.ioContext_, std::chrono::steady_clock::now() + wait); - timer->async_wait([timer, &requestParams, &backend]( - const boost::system::error_code& error) { - backend.writeTransaction(requestParams, true); - }); - } - else - { - --(backend.numRequestsOutstanding_); - - backend.throttleCv_.notify_all(); - if (backend.numRequestsOutstanding_ == 0) - backend.syncCv_.notify_all(); - delete &requestParams; - } + auto func = [&requestParams](auto& params, bool retry) { + requestParams.backend->writeTransaction(params, retry); + }; + processAsyncWriteResponse(std::move(requestParams), fut, func); } void flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData) { CassandraBackend::WriteAccountTxCallbackData& requestParams = *static_cast(cbData); - CassandraBackend const& backend = *requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert error: " << rc << ", " - << cass_error_desc(rc) << ", retrying "; - // exponential backoff with a max wait of 2^10 ms (about 1 second) - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.ioContext_, std::chrono::steady_clock::now() + wait); - timer->async_wait([timer, &requestParams, &backend]( - const boost::system::error_code& error) { - backend.writeAccountTx(requestParams, true); - }); - } - else - { - --(backend.numRequestsOutstanding_); - - backend.throttleCv_.notify_all(); - if (backend.numRequestsOutstanding_ == 0) - backend.syncCv_.notify_all(); - int remaining = --requestParams.refs; - if (remaining == 0) - delete &requestParams; - } + auto func = [&requestParams](auto& params, bool retry) { + requestParams.backend->writeAccountTx(params, retry); + }; + processAsyncWriteResponse(std::move(requestParams), fut, func); } void flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData) { CassandraBackend::WriteLedgerHeaderCallbackData& requestParams = *static_cast(cbData); - CassandraBackend const& backend = *requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert error: " << rc << ", " - << cass_error_desc(rc) << ", retrying "; - // exponential backoff with a max wait of 2^10 ms (about 1 second) - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.ioContext_, std::chrono::steady_clock::now() + wait); - timer->async_wait([timer, &requestParams, &backend]( - const boost::system::error_code& error) { - backend.writeLedgerHeader(requestParams, true); - }); - } - else - { - --(backend.numRequestsOutstanding_); - - backend.throttleCv_.notify_all(); - if (backend.numRequestsOutstanding_ == 0) - backend.syncCv_.notify_all(); - delete &requestParams; - } + auto func = [&requestParams](auto& params, bool retry) { + requestParams.backend->writeLedgerHeader(params, retry); + }; + processAsyncWriteResponse(std::move(requestParams), fut, func); } + void flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData) { CassandraBackend::WriteLedgerHashCallbackData& requestParams = *static_cast(cbData); - CassandraBackend const& backend = *requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert error: " << rc << ", " - << cass_error_desc(rc) << ", retrying "; - // exponential backoff with a max wait of 2^10 ms (about 1 second) - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.ioContext_, std::chrono::steady_clock::now() + wait); - timer->async_wait([timer, &requestParams, &backend]( - const boost::system::error_code& error) { - backend.writeLedgerHash(requestParams, true); - }); - } - else - { - --(backend.numRequestsOutstanding_); - - backend.throttleCv_.notify_all(); - if (backend.numRequestsOutstanding_ == 0) - backend.syncCv_.notify_all(); - delete &requestParams; - } + auto func = [&requestParams](auto& params, bool retry) { + requestParams.backend->writeLedgerHash(params, retry); + }; + processAsyncWriteResponse(std::move(requestParams), fut, func); } // Process the result of an asynchronous read. Retry on error @@ -322,69 +170,15 @@ flatMapReadCallback(CassFuture* fut, void* cbData) { CassandraBackend::ReadCallbackData& requestParams = *static_cast(cbData); + auto func = [&requestParams](auto& params) { + requestParams.backend.read(params); + }; + CassandraAsyncResult asyncResult{requestParams, fut, func}; + CassandraResult& result = asyncResult.getResult(); - CassError rc = cass_future_error_code(fut); - - if (rc != CASS_OK) + if (!!result) { - BOOST_LOG_TRIVIAL(warning) << "Cassandra fetch error : " << rc << " : " - << cass_error_desc(rc) << " - retrying"; - // Retry right away. The only time the cluster should ever be overloaded - // is when the very first ledger is being written in full (millions of - // writes at once), during which no reads should be occurring. If reads - // are timing out, the code/architecture should be modified to handle - // greater read load, as opposed to just exponential backoff - requestParams.backend.read(requestParams); - } - else - { - auto finish = [&requestParams]() { - size_t batchSize = requestParams.batchSize; - if (++(requestParams.numFinished) == batchSize) - requestParams.cv.notify_all(); - }; - CassResult const* res = cass_future_get_result(fut); - - CassRow const* row = cass_result_first_row(res); - if (!row) - { - cass_result_free(res); - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc - << ", " << cass_error_desc(rc); - finish(); - return; - } - cass_byte_t const* buf; - std::size_t bufSize; - rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize); - if (rc != CASS_OK) - { - cass_result_free(res); - BOOST_LOG_TRIVIAL(error) - << "Cassandra fetch get bytes error : " << rc << ", " - << cass_error_desc(rc); - finish(); - return; - } - - std::vector txn{buf, buf + bufSize}; - cass_byte_t const* buf2; - std::size_t buf2Size; - rc = - cass_value_get_bytes(cass_row_get_column(row, 1), &buf2, &buf2Size); - if (rc != CASS_OK) - { - cass_result_free(res); - BOOST_LOG_TRIVIAL(error) - << "Cassandra fetch get bytes error : " << rc << ", " - << cass_error_desc(rc); - finish(); - return; - } - std::vector meta{buf2, buf2 + buf2Size}; - requestParams.result = {std::move(txn), std::move(meta)}; - cass_result_free(res); - finish(); + requestParams.result = {result.getBytes(), result.getBytes()}; } } @@ -396,59 +190,15 @@ flatMapReadObjectCallback(CassFuture* fut, void* cbData) { CassandraBackend::ReadObjectCallbackData& requestParams = *static_cast(cbData); + auto func = [&requestParams](auto& params) { + requestParams.backend.readObject(params); + }; + CassandraAsyncResult asyncResult{requestParams, fut, func}; + CassandraResult& result = asyncResult.getResult(); - CassError rc = cass_future_error_code(fut); - - if (rc != CASS_OK) + if (!!result) { - BOOST_LOG_TRIVIAL(warning) << "Cassandra fetch error : " << rc << " : " - << cass_error_desc(rc) << " - retrying"; - // Retry right away. The only time the cluster should ever be overloaded - // is when the very first ledger is being written in full (millions of - // writes at once), during which no reads should be occurring. If reads - // are timing out, the code/architecture should be modified to handle - // greater read load, as opposed to just exponential backoff - requestParams.backend.readObject(requestParams); - } - else - { - auto finish = [&requestParams]() { - BOOST_LOG_TRIVIAL(trace) - << "flatMapReadObjectCallback - finished a read"; - size_t batchSize = requestParams.batchSize; - if (++(requestParams.numFinished) == batchSize) - requestParams.cv.notify_all(); - }; - CassResult const* res = cass_future_get_result(fut); - - CassRow const* row = cass_result_first_row(res); - if (!row) - { - cass_result_free(res); - BOOST_LOG_TRIVIAL(error) - << "Cassandra fetch get row error : " << rc << ", " - << cass_error_desc(rc) - << " key = " << ripple::strHex(requestParams.key); - finish(); - return; - } - cass_byte_t const* buf; - std::size_t bufSize; - rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize); - if (rc != CASS_OK) - { - cass_result_free(res); - BOOST_LOG_TRIVIAL(error) - << "Cassandra fetch get bytes error : " << rc << ", " - << cass_error_desc(rc); - finish(); - return; - } - - std::vector obj{buf, buf + bufSize}; - requestParams.result = std::move(obj); - cass_result_free(res); - finish(); + requestParams.result = result.getBytes(); } } @@ -456,68 +206,24 @@ std::optional CassandraBackend::fetchLedgerRange() const { BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; - auto start = std::chrono::system_clock::now(); - CassStatement* statement = cass_prepared_bind(selectLedgerRange_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - CassFuture* fut; - CassError rc; - do + CassandraStatement statement{selectLedgerRange_}; + CassandraResult result = executeSyncRead(statement); + if (!result) { - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - std::stringstream ss; - ss << "Cassandra fetch error"; - ss << ", retrying"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - } while (rc != CASS_OK); - - CassResult const* res = cass_future_get_result(fut); - cass_statement_free(statement); - cass_future_free(fut); - CassIterator* iter = cass_iterator_from_result(res); - std::optional min; - std::optional max; - if (cass_iterator_next(iter)) - { - cass_int64_t sequence; - rc = cass_value_get_int64( - cass_row_get_column(cass_iterator_get_row(iter), 0), &sequence); - if (rc != CASS_OK) - { - cass_result_free(res); - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc - << ", " << cass_error_desc(rc); - return {}; - } - LedgerRange range; - range.minSequence = sequence; - if (!cass_iterator_next(iter)) - { - cass_result_free(res); - return range; - } - rc = cass_value_get_int64( - cass_row_get_column(cass_iterator_get_row(iter), 0), &sequence); - if (rc != CASS_OK) - { - cass_result_free(res); - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc - << ", " << cass_error_desc(rc); - return {}; - } - cass_result_free(res); - range.maxSequence = sequence; - if (range.minSequence > range.maxSequence) - { - std::swap(range.minSequence, range.maxSequence); - } - return range; + BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows"; + return {}; } - return {}; + LedgerRange range; + range.maxSequence = range.minSequence = result.getUInt32(); + if (result.nextRow()) + { + range.maxSequence = result.getUInt32(); + } + if (range.minSequence > range.maxSequence) + { + std::swap(range.minSequence, range.maxSequence); + } + return range; } void @@ -1098,13 +804,13 @@ CassandraBackend::open() query << "INSERT INTO " << tableName << "flattransactions" << " (hash, sequence, transaction, metadata) VALUES (?, ?, " "?, ?)"; - if (!insertTransaction_.prepare(query, session_.get())) + if (!insertTransaction_.prepareStatement(query, session_.get())) continue; query = {}; query << "INSERT INTO " << tableName << "keys" << " (key, created, deleted) VALUES (?, ?, ?)"; - if (!insertKey_.prepare(query, session_.get())) + if (!insertKey_.prepareStatement(query, session_.get())) continue; query = {}; @@ -1150,7 +856,8 @@ CassandraBackend::open() /* query = {}; - query << "SELECT filterempty(key,object) FROM " << tableName << "flat " + query << "SELECT filterempty(key,object) FROM " << tableName << + "flat " << " WHERE TOKEN(key) >= ? and sequence <= ?" << " PER PARTITION LIMIT 1 LIMIT ?" << " ALLOW FILTERING"; @@ -1172,8 +879,6 @@ CassandraBackend::open() if (!getBook_.prepareStatement(query, session_.get())) continue; - getBook_ = cass_future_get_prepared(prepare_future); - query = {}; query << " INSERT INTO " << tableName << "account_tx" << " (account, seq_idx, hash) " @@ -1224,8 +929,6 @@ CassandraBackend::open() if (!selectLedgerRange_.prepareStatement(query, session_.get())) continue; - selectLedgerRange_ = cass_future_get_prepared(prepare_future); - setupPreparedStatements = true; } diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index a4c93c19..82692f6d 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -61,25 +61,24 @@ class CassandraPreparedStatement { private: CassPrepared const* prepared_ = nullptr; - CassStatement* statement_ = nullptr; public: CassPrepared const* - get() + get() const { - return statement_; + return prepared_; } bool - prepareStatement(std::stringstream const& query) + prepareStatement(std::stringstream const& query, CassSession* session) { - return prepareStatement(query.str().c_str()); + return prepareStatement(query.str().c_str(), session); } bool - prepareStatement(std::string const& query) + prepareStatement(std::string const& query, CassSession* session) { - return prepareStatement(query.c_str()); + return prepareStatement(query.c_str(), session); } bool @@ -122,6 +121,7 @@ class CassandraStatement CassStatement* statement_ = nullptr; size_t curBindingIndex_ = 0; +public: CassandraStatement(CassandraPreparedStatement const& prepared) { statement_ = cass_prepared_bind(prepared.get()); @@ -134,26 +134,55 @@ class CassandraStatement return statement_; } - bool + void + bindBoolean(bool val) + { + if (!statement_) + throw std::runtime_error( + "CassandraStatement::bindBoolean - statement_ is null"); + CassError rc = cass_statement_bind_bool( + statement_, 1, static_cast(val)); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Error binding boolean to statement: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + throw std::runtime_error(ss.str()); + } + curBindingIndex_++; + } + + void bindBytes(const char* data, uint32_t size) { - return bindBytes((unsigned char*)data, size); + bindBytes((unsigned char*)data, size); } - template - bool - bindBytes(ripple::base_uint const& data) + void + bindBytes(ripple::uint256 const& data) { - return bindBytes(data.data(), data.size()); + bindBytes(data.data(), data.size()); + } + void + bindBytes(ripple::AccountID const& data) + { + bindBytes(data.data(), data.size()); } - bool + void bindBytes(std::string const& data) { - return bindBytes(data.data(), data.size()); + bindBytes(data.data(), data.size()); } - bool + void + bindBytes(void const* key, uint32_t size) + { + bindBytes(static_cast(key), size); + } + + void bindBytes(const unsigned char* data, uint32_t size) { if (!statement_) @@ -170,19 +199,18 @@ class CassandraStatement ss << "Error binding bytes to statement: " << rc << ", " << cass_error_desc(rc); BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - return false; + throw std::runtime_error(ss.str()); } curBindingIndex_++; - return true; } - bool + void bindInt(uint32_t value) { - return bindInt((int64_t)value); + bindInt((int64_t)value); } - bool + void bindInt(int64_t value) { if (!statement_) @@ -196,24 +224,23 @@ class CassandraStatement ss << "Error binding int to statement: " << rc << ", " << cass_error_desc(rc); BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - return false; + throw std::runtime_error(ss.str()); } curBindingIndex_++; - return true; } - bool + void bindIntTuple(uint32_t first, uint32_t second) { CassTuple* tuple = cass_tuple_new(2); - rc = cass_tuple_set_int64(tuple, 0, first); + CassError rc = cass_tuple_set_int64(tuple, 0, first); if (rc != CASS_OK) { std::stringstream ss; ss << "Error binding int to tuple: " << rc << ", " << cass_error_desc(rc); BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - return false; + throw std::runtime_error(ss.str()); } rc = cass_tuple_set_int64(tuple, 1, second); if (rc != CASS_OK) @@ -222,18 +249,18 @@ class CassandraStatement ss << "Error binding int to tuple: " << rc << ", " << cass_error_desc(rc); BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - return false; + throw std::runtime_error(ss.str()); } rc = cass_statement_bind_tuple(statement_, curBindingIndex_, tuple); if (rc != CASS_OK) { std::stringstream ss; - ss << "Binding tuple: " << rc << ", " << cass_error_desc(rc); + ss << "Error binding tuple to statement: " << rc << ", " + << cass_error_desc(rc); BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - return false; + throw std::runtime_error(ss.str()); } curBindingIndex_++; - return true; } CassandraStatement() @@ -243,6 +270,198 @@ class CassandraStatement } }; +class CassandraResult +{ + CassResult const* result_ = nullptr; + CassRow const* row_ = nullptr; + CassIterator* iter_ = nullptr; + size_t curGetIndex_ = 0; + +public: + CassandraResult() + { + } + + CassandraResult(CassandraResult const& other) = delete; + + CassandraResult(CassResult const* result) : result_(result) + { + if (!result_) + throw std::runtime_error("CassandraResult - result is null"); + iter_ = cass_iterator_from_result(result_); + if (cass_iterator_next(iter_)) + { + row_ = cass_iterator_get_row(iter_); + } + } + + bool + hasResult() + { + return row_ != nullptr; + } + + bool + operator!() + { + return !hasResult(); + } + + size_t + numRows() + { + return cass_result_row_count(result_); + } + + bool + nextRow() + { + curGetIndex_ = 0; + if (cass_iterator_next(iter_)) + { + row_ = cass_iterator_get_row(iter_); + return true; + } + row_ = nullptr; + return false; + } + + std::vector + getBytes() + { + if (!row_) + throw std::runtime_error("CassandraResult::getBytes - no result"); + cass_byte_t const* buf; + std::size_t bufSize; + CassError rc = cass_value_get_bytes( + cass_row_get_column(row_, curGetIndex_), &buf, &bufSize); + if (rc != CASS_OK) + { + std::stringstream msg; + msg << "CassandraResult::getBytes - error getting value: " << rc + << ", " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << msg.str(); + throw std::runtime_error(msg.str()); + } + curGetIndex_++; + return {buf, buf + bufSize}; + } + + ripple::uint256 + getUInt256() + { + if (!row_) + throw std::runtime_error("CassandraResult::getBytes - no result"); + cass_byte_t const* buf; + std::size_t bufSize; + CassError rc = cass_value_get_bytes( + cass_row_get_column(row_, curGetIndex_), &buf, &bufSize); + if (rc != CASS_OK) + { + std::stringstream msg; + msg << "CassandraResult::getBytes - error getting value: " << rc + << ", " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << msg.str(); + throw std::runtime_error(msg.str()); + } + curGetIndex_++; + return ripple::uint256::fromVoid(buf); + } + + int64_t + getInt64() + { + if (!row_) + throw std::runtime_error("CassandraResult::getInt64 - no result"); + cass_int64_t val; + CassError rc = + cass_value_get_int64(cass_row_get_column(row_, curGetIndex_), &val); + if (rc != CASS_OK) + { + std::stringstream msg; + msg << "CassandraResult::getInt64 - error getting value: " << rc + << ", " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << msg.str(); + throw std::runtime_error(msg.str()); + } + ++curGetIndex_; + return val; + } + + uint32_t + getUInt32() + { + return (uint32_t)getInt64(); + } + + std::pair + getInt64Tuple() + { + if (!row_) + throw std::runtime_error( + "CassandraResult::getInt64Tuple - no result"); + CassValue const* tuple = cass_row_get_column(row_, curGetIndex_); + CassIterator* tupleIter = cass_iterator_from_tuple(tuple); + if (!cass_iterator_next(tupleIter)) + throw std::runtime_error( + "CassandraResult::getInt64Tuple - failed to iterate tuple"); + CassValue const* value = cass_iterator_get_value(tupleIter); + int64_t first; + cass_value_get_int64(value, &first); + if (!cass_iterator_next(tupleIter)) + throw std::runtime_error( + "CassandraResult::getInt64Tuple - failed to iterate tuple"); + value = cass_iterator_get_value(tupleIter); + int64_t second; + cass_value_get_int64(value, &second); + ++curGetIndex_; + return {first, second}; + } + + ~CassandraResult() + { + cass_result_free(result_); + } +}; +template +class CassandraAsyncResult +{ + T& requestParams_; + CassandraResult result_; + +public: + CassandraAsyncResult(T& requestParams, CassFuture* fut, F retry) + : requestParams_(requestParams) + { + CassError rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + retry(requestParams_); + } + else + { + result_ = CassandraResult(cass_future_get_result(fut)); + } + } + + ~CassandraAsyncResult() + { + if (!!result_) + { + BOOST_LOG_TRIVIAL(trace) << "finished a request"; + size_t batchSize = requestParams_.batchSize; + if (++(requestParams_.numFinished) == batchSize) + requestParams_.cv.notify_all(); + } + } + + CassandraResult& + getResult() + { + return result_; + } +}; + class CassandraBackend : public BackendInterface { private: @@ -378,117 +597,36 @@ public: std::optional const& cursor) const override { BOOST_LOG_TRIVIAL(debug) << "Starting doAccountTx"; - CassStatement* statement = cass_prepared_bind(selectAccountTx_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - - CassError rc = cass_statement_bind_bytes( - statement, 0, static_cast(account.data()), 20); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra account_tx query: " << rc << ", " - << cass_error_desc(rc); - return {}; - } - CassTuple* cassCursor = cass_tuple_new(2); + CassandraStatement statement{selectAccountTx_}; + statement.bindBytes(account); if (cursor) - { - cass_tuple_set_int64(cassCursor, 0, cursor->ledgerSequence); - cass_tuple_set_int64(cassCursor, 1, cursor->transactionIndex); - } + statement.bindIntTuple( + cursor->ledgerSequence, cursor->transactionIndex); else + statement.bindIntTuple(INT32_MAX, INT32_MAX); + statement.bindInt(limit); + CassandraResult result = executeSyncRead(statement); + if (!result.hasResult()) { - cass_tuple_set_int64(cassCursor, 0, INT32_MAX); - cass_tuple_set_int64(cassCursor, 1, INT32_MAX); - } - rc = cass_statement_bind_tuple(statement, 1, cassCursor); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra tuple to account_tx: " << rc << ", " - << cass_error_desc(rc); - return {}; - } - rc = cass_statement_bind_int64(statement, 2, limit); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding limit to account_tx query: " << rc << ", " - << cass_error_desc(rc); - return {}; + BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows returned"; + return {{}, {}}; } - CassFuture* fut; + std::vector hashes; + size_t numRows = result.numRows(); + bool returnCursor = numRows == limit; + std::optional retCursor; do { - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - std::stringstream ss; - ss << "Cassandra account_tx fetch error"; - ss << ", retrying"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - } while (rc != CASS_OK); - - CassResult const* res = cass_future_get_result(fut); - cass_statement_free(statement); - cass_future_free(fut); - - BOOST_LOG_TRIVIAL(debug) << "doAccountTx - got hashes"; - std::vector hashes; - size_t numRows = cass_result_row_count(res); - bool more = numRows == 300; - - CassIterator* iter = cass_iterator_from_result(res); - std::optional retCursor; - while (cass_iterator_next(iter)) - { - CassRow const* row = cass_iterator_get_row(iter); - - cass_byte_t const* outData; - std::size_t outSize; - - CassValue const* hash = cass_row_get_column(row, 0); - rc = cass_value_get_bytes(hash, &outData, &outSize); - if (rc != CASS_OK) - { - cass_iterator_free(iter); - - std::stringstream ss; - ss << "Cassandra fetch error"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - hashes.push_back(ripple::uint256::fromVoid(outData)); + hashes.push_back(result.getUInt256()); --numRows; - if (numRows == 0) + if (numRows == 0 && returnCursor) { - if (more) - { - CassValue const* cassCursorVal = - cass_row_get_column(row, 1); - CassIterator* tupleIter = - cass_iterator_from_tuple(cassCursorVal); - cass_iterator_next(tupleIter); - CassValue const* seqVal = - cass_iterator_get_value(tupleIter); - cass_iterator_next(tupleIter); - CassValue const* idxVal = - cass_iterator_get_value(tupleIter); - int64_t seqOut; - int64_t idxOut; - cass_value_get_int64(seqVal, &seqOut); - cass_value_get_int64(idxVal, &idxOut); - retCursor = {(uint32_t)seqOut, (uint32_t)idxOut}; - } + auto [lgrSeq, txnIdx] = result.getInt64Tuple(); + retCursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx}; } - } + } while (result.nextRow()); + BOOST_LOG_TRIVIAL(debug) << "doAccountTx - populated hashes. num hashes = " << hashes.size(); if (hashes.size()) @@ -505,6 +643,7 @@ public: std::string header; uint32_t currentRetries = 0; + std::atomic refs = 1; WriteLedgerHeaderCallbackData( CassandraBackend const* f, uint32_t sequence, @@ -520,6 +659,7 @@ public: uint32_t sequence; uint32_t currentRetries = 0; + std::atomic refs = 1; WriteLedgerHashCallbackData( CassandraBackend const* f, ripple::uint256 hash, @@ -537,74 +677,17 @@ public: // write range if (isFirstLedger_) { - CassStatement* statement = cass_prepared_bind(updateLedgerRange_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - CassError rc = - cass_statement_bind_int64(statement, 0, ledgerSequence_); - rc = cass_statement_bind_bool(statement, 1, cass_false); - - rc = cass_statement_bind_int64(statement, 2, ledgerSequence_); - CassFuture* fut; - do - { - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - std::stringstream ss; - ss << "Cassandra write error"; - ss << ", retrying"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - } while (rc != CASS_OK); - cass_statement_free(statement); + CassandraStatement statement{updateLedgerRange_}; + statement.bindInt(ledgerSequence_); + statement.bindBoolean(false); + statement.bindInt(ledgerSequence_); + executeSyncWrite(statement); } - CassStatement* statement = cass_prepared_bind(updateLedgerRange_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - // TODO check rc - CassError rc = cass_statement_bind_int64(statement, 0, ledgerSequence_); - assert(rc == CASS_OK); - rc = cass_statement_bind_bool(statement, 1, cass_true); - assert(rc == CASS_OK); - rc = cass_statement_bind_int64(statement, 2, ledgerSequence_); - assert(rc == CASS_OK); - CassFuture* fut; - do - { - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - std::stringstream ss; - ss << "Cassandra write error"; - ss << ", retrying"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - } while (rc != CASS_OK); - cass_statement_free(statement); - CassResult const* res = cass_future_get_result(fut); - cass_future_free(fut); - - CassRow const* row = cass_result_first_row(res); - if (!row) - { - BOOST_LOG_TRIVIAL(error) << "Cassandra write error: no rows"; - cass_result_free(res); - return false; - } - cass_bool_t success; - rc = cass_value_get_bool(cass_row_get_column(row, 0), &success); - if (rc != CASS_OK) - { - cass_result_free(res); - BOOST_LOG_TRIVIAL(error) << "Cassandra write error: " << rc << ", " - << cass_error_desc(rc); - return false; - } - cass_result_free(res); - return success == cass_true; + CassandraStatement statement{updateLedgerRange_}; + statement.bindInt(ledgerSequence_); + statement.bindBoolean(true); + statement.bindInt(ledgerSequence_); + return executeSyncUpdate(statement); } void writeLedger( @@ -625,205 +708,53 @@ public: void writeLedgerHash(WriteLedgerHashCallbackData& cb, bool isRetry) const { - { - std::unique_lock lck(throttleMutex_); - if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding) - { - BOOST_LOG_TRIVIAL(info) - << __func__ << " : " - << "Max outstanding requests reached. " - << "Waiting for other requests to finish"; - throttleCv_.wait(lck, [this]() { - return numRequestsOutstanding_ < maxRequestsOutstanding; - }); - } - } - CassStatement* statement = cass_prepared_bind(insertLedgerHash_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - CassError rc = cass_statement_bind_bytes( - statement, 0, static_cast(cb.hash.data()), 32); - - assert(rc == CASS_OK); - rc = cass_statement_bind_int64(statement, 1, cb.sequence); - assert(rc == CASS_OK); - // actually do the write - CassFuture* fut = cass_session_execute(session_.get(), statement); - cass_statement_free(statement); - - cass_future_set_callback( - fut, flatMapWriteLedgerHashCallback, static_cast(&cb)); - cass_future_free(fut); + CassandraStatement statement{insertLedgerHash_}; + statement.bindBytes(cb.hash); + statement.bindInt(cb.sequence); + executeAsyncWrite( + statement, flatMapWriteLedgerHashCallback, cb, isRetry); } void writeLedgerHeader(WriteLedgerHeaderCallbackData& cb, bool isRetry) const { - // write header - { - std::unique_lock lck(throttleMutex_); - if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding) - { - BOOST_LOG_TRIVIAL(info) - << __func__ << " : " - << "Max outstanding requests reached. " - << "Waiting for other requests to finish"; - throttleCv_.wait(lck, [this]() { - return numRequestsOutstanding_ < maxRequestsOutstanding; - }); - } - } - unsigned char* headerRaw = (unsigned char*)cb.header.data(); - CassStatement* statement = cass_prepared_bind(insertLedgerHeader_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - CassError rc = cass_statement_bind_int64(statement, 0, cb.sequence); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra insert ledger header: " << rc << ", " - << cass_error_desc(rc); - return; - } - rc = cass_statement_bind_bytes( - statement, - 1, - static_cast(headerRaw), - cb.header.size()); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra insert ledger header: " << rc << ", " - << cass_error_desc(rc); - return; - } - // actually do the write - CassFuture* fut = cass_session_execute(session_.get(), statement); - cass_statement_free(statement); - - cass_future_set_callback( - fut, flatMapWriteLedgerHeaderCallback, static_cast(&cb)); - cass_future_free(fut); + CassandraStatement statement{insertLedgerHeader_}; + statement.bindInt(cb.sequence); + statement.bindBytes(cb.header); + executeAsyncWrite( + statement, flatMapWriteLedgerHeaderCallback, cb, isRetry); } std::optional fetchLatestLedgerSequence() const override { - BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; - auto start = std::chrono::system_clock::now(); - CassStatement* statement = cass_prepared_bind(selectLatestLedger_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - CassFuture* fut; - CassError rc; - do + BOOST_LOG_TRIVIAL(trace) << __func__; + CassandraStatement statement{selectLatestLedger_}; + CassandraResult result = executeSyncRead(statement); + if (!result.hasResult()) { - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - std::stringstream ss; - ss << "Cassandra fetch error"; - ss << ", retrying"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - } while (rc != CASS_OK); - - CassResult const* res = cass_future_get_result(fut); - cass_statement_free(statement); - cass_future_free(fut); - - CassRow const* row = cass_result_first_row(res); - if (!row) - { - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch error: no rows"; - cass_result_free(res); + BOOST_LOG_TRIVIAL(error) + << "CassandraBackend::fetchLatestLedgerSequence - no rows"; return {}; } - cass_int64_t sequence; - rc = cass_value_get_int64(cass_row_get_column(row, 0), &sequence); - if (rc != CASS_OK) - { - cass_result_free(res); - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc - << ", " << cass_error_desc(rc); - return {}; - } - cass_result_free(res); - auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(debug) - << "Fetched from cassandra in " - << std::chrono::duration_cast( - end - start) - .count() - << " microseconds"; - return sequence; + return result.getUInt32(); } std::optional fetchLedgerBySequence(uint32_t sequence) const override { - BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; - auto start = std::chrono::system_clock::now(); - CassStatement* statement = cass_prepared_bind(selectLedgerBySeq_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - CassError rc = cass_statement_bind_int64(statement, 0, sequence); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra ledger fetch query: " << rc << ", " - << cass_error_desc(rc); - return {}; - } - CassFuture* fut; - do - { - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - std::stringstream ss; - ss << "Cassandra fetch error"; - ss << ", retrying"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - } while (rc != CASS_OK); + BOOST_LOG_TRIVIAL(trace) << __func__; + CassandraStatement statement{selectLedgerBySeq_}; + statement.bindInt(sequence); + CassandraResult result = executeSyncRead(statement); - CassResult const* res = cass_future_get_result(fut); - cass_statement_free(statement); - cass_future_free(fut); - - CassRow const* row = cass_result_first_row(res); - if (!row) + if (!result) { - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch error: no rows"; - cass_result_free(res); + BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows"; return {}; } - cass_byte_t const* buf; - std::size_t bufSize; - rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize); - if (rc != CASS_OK) - { - cass_result_free(res); - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc - << ", " << cass_error_desc(rc); - return {}; - } - std::vector result{buf, buf + bufSize}; - ripple::LedgerInfo lgrInfo = - deserializeHeader(ripple::makeSlice(result)); - cass_result_free(res); - auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(debug) - << "Fetched from cassandra in " - << std::chrono::duration_cast( - end - start) - .count() - << " microseconds"; - return lgrInfo; + std::vector header = result.getBytes(); + return deserializeHeader(ripple::makeSlice(header)); } std::optional fetchLedgerRange() const override; @@ -838,206 +769,50 @@ public: const override { BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; - auto start = std::chrono::system_clock::now(); - CassStatement* statement = cass_prepared_bind(selectObject_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - CassError rc = cass_statement_bind_bytes( - statement, 0, static_cast(key.data()), 32); - if (rc != CASS_OK) + CassandraStatement statement{selectObject_}; + statement.bindBytes(key); + statement.bindInt(sequence); + CassandraResult result = executeSyncRead(statement); + if (!result) { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) << "Binding Cassandra fetch query: " << rc - << ", " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows"; return {}; } - rc = cass_statement_bind_int64(statement, 1, sequence); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) << "Binding Cassandra fetch query: " << rc - << ", " << cass_error_desc(rc); - return {}; - } - CassFuture* fut; - do - { - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - std::stringstream ss; - ss << "Cassandra fetch error"; - ss << ", retrying"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - } while (rc != CASS_OK); - - CassResult const* res = cass_future_get_result(fut); - cass_statement_free(statement); - cass_future_free(fut); - - CassRow const* row = cass_result_first_row(res); - if (!row) - { - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch error: no rows"; - cass_result_free(res); - return {}; - } - cass_byte_t const* buf; - std::size_t bufSize; - rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize); - if (rc != CASS_OK) - { - cass_result_free(res); - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc - << ", " << cass_error_desc(rc); - return {}; - } - std::vector result{buf, buf + bufSize}; - cass_result_free(res); - auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(debug) - << "Fetched from cassandra in " - << std::chrono::duration_cast( - end - start) - .count() - << " microseconds"; - return result; + return result.getBytes(); } std::optional getToken(void const* key) const { BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; - auto start = std::chrono::system_clock::now(); - CassStatement* statement = cass_prepared_bind(getToken_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - CassError rc = cass_statement_bind_bytes( - statement, 0, static_cast(key), 32); - if (rc != CASS_OK) + CassandraStatement statement{getToken_}; + statement.bindBytes(key, 32); + CassandraResult result = executeSyncRead(statement); + if (!result) { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) << "Binding Cassandra fetch query: " << rc - << ", " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows"; return {}; } - CassFuture* fut; - do - { - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - std::stringstream ss; - ss << "Cassandra fetch error"; - ss << ", retrying"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - } while (rc != CASS_OK); - - CassResult const* res = cass_future_get_result(fut); - cass_statement_free(statement); - cass_future_free(fut); - - CassRow const* row = cass_result_first_row(res); - if (!row) - { - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch error: no rows"; - cass_result_free(res); - return {}; - } - cass_int64_t token; - rc = cass_value_get_int64(cass_row_get_column(row, 0), &token); - if (rc != CASS_OK) - { - cass_result_free(res); - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc - << ", " << cass_error_desc(rc); - return {}; - } - cass_result_free(res); + int64_t token = result.getInt64(); if (token == INT64_MAX) return {}; - return token + 1; + else + return token + 1; } std::optional fetchTransaction(ripple::uint256 const& hash) const override { BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; - auto start = std::chrono::system_clock::now(); - CassStatement* statement = cass_prepared_bind(selectTransaction_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - CassError rc = cass_statement_bind_bytes( - statement, 0, static_cast(hash.data()), 32); - if (rc != CASS_OK) + CassandraStatement statement{selectTransaction_}; + statement.bindBytes(hash); + CassandraResult result = executeSyncRead(statement); + if (!result) { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) << "Binding Cassandra fetch query: " << rc - << ", " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows"; return {}; } - CassFuture* fut; - do - { - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - std::stringstream ss; - ss << "Cassandra fetch error"; - ss << ", retrying"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - } while (rc != CASS_OK); - - CassResult const* res = cass_future_get_result(fut); - cass_statement_free(statement); - cass_future_free(fut); - - CassRow const* row = cass_result_first_row(res); - if (!row) - { - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch error: no rows"; - cass_result_free(res); - return {}; - } - cass_byte_t const* txBuf; - std::size_t txBufSize; - rc = cass_value_get_bytes( - cass_row_get_column(row, 0), &txBuf, &txBufSize); - if (rc != CASS_OK) - { - cass_result_free(res); - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc - << ", " << cass_error_desc(rc); - return {}; - } - std::vector txResult{txBuf, txBuf + txBufSize}; - cass_byte_t const* metaBuf; - std::size_t metaBufSize; - rc = cass_value_get_bytes( - cass_row_get_column(row, 0), &metaBuf, &metaBufSize); - if (rc != CASS_OK) - { - cass_result_free(res); - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc - << ", " << cass_error_desc(rc); - return {}; - } - std::vector metaResult{metaBuf, metaBuf + metaBufSize}; - cass_result_free(res); - auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(debug) - << "Fetched from cassandra in " - << std::chrono::duration_cast( - end - start) - .count() - << " microseconds"; - return {{txResult, metaResult}}; + return {{result.getBytes(), result.getBytes()}}; } LedgerPage fetchLedgerPage( @@ -1046,8 +821,7 @@ public: std::uint32_t limit) const override { BOOST_LOG_TRIVIAL(debug) << "Starting doUpperBound"; - CassStatement* statement = cass_prepared_bind(upperBound_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + CassandraStatement statement{selectLedgerPage_}; int64_t intCursor = INT64_MIN; if (cursor) @@ -1056,91 +830,23 @@ public: if (token) intCursor = *token; } + statement.bindInt(intCursor); + statement.bindInt(ledgerSequence); + statement.bindInt(ledgerSequence); + statement.bindInt(limit); - CassError rc = cass_statement_bind_int64(statement, 0, intCursor); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra hash to doUpperBound query: " << rc - << ", " << cass_error_desc(rc); - return {}; - } + CassandraResult result = executeSyncRead(statement); - rc = cass_statement_bind_int64(statement, 1, ledgerSequence); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra seq to doUpperBound query: " << rc << ", " - << cass_error_desc(rc); - return {}; - } - rc = cass_statement_bind_int64(statement, 2, ledgerSequence); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra seq to doUpperBound query: " << rc << ", " - << cass_error_desc(rc); - return {}; - } - - rc = cass_statement_bind_int32(statement, 3, limit); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra limit to doUpperBound query: " << rc - << ", " << cass_error_desc(rc); - return {}; - } - - CassFuture* fut; - do - { - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - std::stringstream ss; - ss << "Cassandra fetch error"; - ss << ", retrying"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - } while (rc != CASS_OK); - - CassResult const* res = cass_future_get_result(fut); - cass_statement_free(statement); - cass_future_free(fut); - - BOOST_LOG_TRIVIAL(debug) << "doUpperBound - got keys"; + BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys"; std::vector keys; - CassIterator* iter = cass_iterator_from_result(res); - while (cass_iterator_next(iter)) + do { - CassRow const* row = cass_iterator_get_row(iter); + keys.push_back(result.getUInt256()); + } while (result.nextRow()); - cass_byte_t const* outData; - std::size_t outSize; - - CassValue const* hash = cass_row_get_column(row, 0); - rc = cass_value_get_bytes(hash, &outData, &outSize); - if (rc != CASS_OK) - { - cass_iterator_free(iter); - - std::stringstream ss; - ss << "Cassandra fetch error"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - keys.push_back(ripple::uint256::fromVoid(outData)); - } BOOST_LOG_TRIVIAL(debug) - << "doUpperBound - populated keys. num keys = " << keys.size(); + << __func__ << " - populated keys. num keys = " << keys.size(); if (keys.size()) { std::vector results; @@ -1162,115 +868,29 @@ public: std::uint32_t limit, std::optional const& cursor) const override { - BOOST_LOG_TRIVIAL(debug) << "Starting doBookOffers"; - CassStatement* statement = cass_prepared_bind(getBook_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - CassError rc = cass_statement_bind_bytes( - statement, 0, static_cast(book.data()), 32); - - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra book to doBookOffers query: " << rc - << ", " << cass_error_desc(rc); - return {}; - } - - rc = cass_statement_bind_int64(statement, 1, sequence); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra sequence to doBookOffers query: " << rc - << ", " << cass_error_desc(rc); - return {}; - } - rc = cass_statement_bind_int64(statement, 2, sequence); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra deleted_at to doBookOffers query: " << rc - << ", " << cass_error_desc(rc); - return {}; - } + CassandraStatement statement{getBook_}; + statement.bindBytes(book); + statement.bindInt(sequence); + statement.bindInt(sequence); if (cursor) - rc = cass_statement_bind_bytes( - statement, - 3, - static_cast(cursor->data()), - 32); + statement.bindBytes(*cursor); else { ripple::uint256 zero = {}; - rc = cass_statement_bind_bytes( - statement, 3, static_cast(zero.data()), 32); + statement.bindBytes(zero); } + statement.bindInt(limit); + CassandraResult result = executeSyncRead(statement); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra book to doBookOffers query: " << rc - << ", " << cass_error_desc(rc); - return {}; - } - rc = cass_statement_bind_int64(statement, 4, limit); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra limit to doBookOffers query: " << rc - << ", " << cass_error_desc(rc); - return {}; - } - CassFuture* fut; + BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys"; + std::vector keys; do { - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - std::stringstream ss; - ss << "Cassandra fetch error"; - ss << ", retrying"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - } while (rc != CASS_OK); + keys.push_back(result.getUInt256()); + } while (result.nextRow()); - CassResult const* res = cass_future_get_result(fut); - cass_statement_free(statement); - cass_future_free(fut); - - BOOST_LOG_TRIVIAL(debug) << "doUpperBound - got keys"; - std::vector keys; - - CassIterator* iter = cass_iterator_from_result(res); - while (cass_iterator_next(iter)) - { - CassRow const* row = cass_iterator_get_row(iter); - - cass_byte_t const* outData; - std::size_t outSize; - - CassValue const* hash = cass_row_get_column(row, 0); - rc = cass_value_get_bytes(hash, &outData, &outSize); - if (rc != CASS_OK) - { - cass_iterator_free(iter); - - std::stringstream ss; - ss << "Cassandra fetch error"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - keys.push_back(ripple::uint256::fromVoid(outData)); - std::cout << ripple::strHex(keys.back()) << std::endl; - } BOOST_LOG_TRIVIAL(debug) - << "doBookOffers - populated keys. num keys = " << keys.size(); + << __func__ << " - populated keys. num keys = " << keys.size(); if (keys.size()) { std::vector results; @@ -1352,31 +972,9 @@ public: void read(ReadCallbackData& data) const { - CassStatement* statement = cass_prepared_bind(selectTransaction_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - CassError rc = cass_statement_bind_bytes( - statement, - 0, - static_cast(data.hash.data()), - 32); - if (rc != CASS_OK) - { - size_t batchSize = data.batchSize; - if (++(data.numFinished) == batchSize) - data.cv.notify_all(); - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) << "Binding Cassandra fetch query: " << rc - << ", " << cass_error_desc(rc); - return; - } - - CassFuture* fut = cass_session_execute(session_.get(), statement); - - cass_statement_free(statement); - - cass_future_set_callback( - fut, flatMapReadCallback, static_cast(&data)); - cass_future_free(fut); + CassandraStatement statement{selectTransaction_}; + statement.bindBytes(data.hash); + executeAsyncRead(statement, flatMapReadCallback, data); } struct ReadObjectCallbackData @@ -1449,40 +1047,11 @@ public: void readObject(ReadObjectCallbackData& data) const { - CassStatement* statement = cass_prepared_bind(selectObject_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - CassError rc = cass_statement_bind_bytes( - statement, 0, static_cast(data.key.data()), 32); - if (rc != CASS_OK) - { - size_t batchSize = data.batchSize; - if (++(data.numFinished) == batchSize) - data.cv.notify_all(); - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) << "Binding Cassandra fetch query: " << rc - << ", " << cass_error_desc(rc); - return; - } - rc = cass_statement_bind_int64(statement, 1, data.sequence); + CassandraStatement statement{selectObject_}; + statement.bindBytes(data.key); + statement.bindInt(data.sequence); - if (rc != CASS_OK) - { - size_t batchSize = data.batchSize; - if (++(data.numFinished) == batchSize) - data.cv.notify_all(); - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) << "Binding Cassandra fetch query: " << rc - << ", " << cass_error_desc(rc); - return; - } - - CassFuture* fut = cass_session_execute(session_.get(), statement); - - cass_statement_free(statement); - - cass_future_set_callback( - fut, flatMapReadObjectCallback, static_cast(&data)); - cass_future_free(fut); + executeAsyncRead(statement, flatMapReadObjectCallback, data); } struct WriteCallbackData @@ -1546,7 +1115,7 @@ public: statement.bindInt(data.sequence); statement.bindBytes(data.blob); - executeAsync(statement, flatMapWriteCallback, data, isRetry); + executeAsyncWrite(statement, flatMapWriteCallback, data, isRetry); } } @@ -1557,7 +1126,7 @@ public: statement.bindBytes(data.key); statement.bindInt(data.createdSequence); statement.bindInt(data.sequence); - executeAsync(statement, flatMapWriteKeyCallback, data, isRetry); + executeAsyncWrite(statement, flatMapWriteKeyCallback, data, isRetry); } void @@ -1570,13 +1139,15 @@ public: statement.bindInt(data.sequence); statement.bindInt(INT64_MAX); - executeAsync(statement, flatMapWriteKeyCallback, data, isRetry); + executeAsyncWrite( + statement, flatMapWriteKeyCallback, data, isRetry); } else if (data.isDeleted) { CassandraStatement statement{getCreated_}; - executeAsync(statement, flatMapGetCreatedCallback, data, isRetry); + executeAsyncWrite( + statement, flatMapGetCreatedCallback, data, isRetry); } } @@ -1587,16 +1158,12 @@ public: assert(data.book); CassandraStatement statement{ (data.isCreated ? insertBook_ : deleteBook_)}; - if (!statement.bindBytes(*data.book)) - throw std::runtime_error("writeBook: bind error"); - if (!statement.bindBytes(data.key)) - throw std::runtime_error("writeBook: bind error"); - if (!statement.bindInt(data.sequence)) - throw std::runtime_error("writeBook: bind error"); + statement.bindBytes(*data.book); + statement.bindBytes(data.key); + statement.bindInt(data.sequence); if (data.isCreated) - if (!statement.bindInt(INT64_MAX)) - throw std::runtime_error("writeBook: bind error"); - executeAsync(statement, flatMapWriteBookCallback, data, isRetry); + statement.bindInt(INT64_MAX); + executeAsyncWrite(statement, flatMapWriteBookCallback, data, isRetry); } void writeLedgerObject( @@ -1649,19 +1216,12 @@ public: for (auto const& account : data.data.accounts) { CassandraStatement statement(insertAccountTx_); - if (!statement.bindBytes(account)) - throw std::runtime_error( - "writeAccountTx : error binding account"); - if (!statement.bindTuple( - data.data.ledgerSequence, data.data.transactionIndex)) - throw std::runtime_error( - "writeAccountTx: error binding ledger seq and txn idx"); + statement.bindBytes(account); + statement.bindIntTuple( + data.data.ledgerSequence, data.data.transactionIndex); - executeAsync( - statement, - flatMapWriteAccountTxCallback, - static_cast(&data), - isRetry); + executeAsyncWrite( + statement, flatMapWriteAccountTxCallback, data, isRetry); } } @@ -1678,6 +1238,7 @@ public: uint32_t currentRetries = 0; + std::atomic refs = 1; WriteTransactionCallbackData( CassandraBackend const* f, std::string&& hash, @@ -1697,19 +1258,13 @@ public: writeTransaction(WriteTransactionCallbackData& data, bool isRetry) const { CassandraStatement statement{insertTransaction_}; - if (!statement.bindBytes(data.hash)) - throw std::runtime_error("writeTransaction: error binding hash"); - if (!statement.bindInt(data.sequence)) - throw std::runtime_error( - "writeTransaction: error binding sequence"); - if (!statement.bindBytes(data.transaction)) - throw std::runtime_error( - "writeTransaction: error binding transaction"); - if (!statement.bindBytes(data.metadata)) - throw std::runtime_error( - "writeTransaction: error binding transaction"); + statement.bindBytes(data.hash); + statement.bindInt(data.sequence); + statement.bindBytes(data.transaction); + statement.bindBytes(data.metadata); - executeAsync(statement, flatMapWriteTransactionCallback, data, isRetry); + executeAsyncWrite( + statement, flatMapWriteTransactionCallback, data, isRetry); } void writeTransaction( @@ -1743,6 +1298,21 @@ public: syncCv_.wait(lck, [this]() { return numRequestsOutstanding_ == 0; }); } + void + finishAsyncWrite() const + { + --numRequestsOutstanding_; + throttleCv_.notify_all(); + if (numRequestsOutstanding_ == 0) + syncCv_.notify_all(); + } + + boost::asio::io_context& + getIOContext() const + { + return ioContext_; + } + friend void flatMapWriteCallback(CassFuture* fut, void* cbData); friend void @@ -1765,13 +1335,8 @@ public: friend void flatMapGetCreatedCallback(CassFuture* fut, void* cbData); - template void - executeAsync( - CassandraStatement const& statement, - T callback, - S& callbackData, - bool isRetry) const + waitIfNeccessary(bool isRetry) const { { std::unique_lock lck(throttleMutex_); @@ -1786,12 +1351,127 @@ public: }); } } + } + + template + void + executeAsyncHelper( + CassandraStatement const& statement, + T callback, + S& callbackData) const + { CassFuture* fut = cass_session_execute(session_.get(), statement.get()); cass_future_set_callback( fut, callback, static_cast(&callbackData)); cass_future_free(fut); } + template + void + executeAsyncWrite( + CassandraStatement const& statement, + T callback, + S& callbackData, + bool isRetry) const + { + waitIfNeccessary(isRetry); + executeAsyncHelper(statement, callback, callbackData); + } + template + void + executeAsyncRead( + CassandraStatement const& statement, + T callback, + S& callbackData) const + { + executeAsyncHelper(statement, callback, callbackData); + } + void + executeSyncWrite(CassandraStatement const& statement) const + { + CassFuture* fut; + CassError rc; + do + { + fut = cass_session_execute(session_.get(), statement.get()); + rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Cassandra sync write error"; + ss << ", retrying"; + ss << ": " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(warning) << ss.str(); + } + } while (rc != CASS_OK); + cass_future_free(fut); + } + + bool + executeSyncUpdate(CassandraStatement const& statement) const + { + CassFuture* fut; + CassError rc; + do + { + fut = cass_session_execute(session_.get(), statement.get()); + rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Cassandra sync write error"; + ss << ", retrying"; + ss << ": " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(warning) << ss.str(); + } + } while (rc != CASS_OK); + CassResult const* res = cass_future_get_result(fut); + cass_future_free(fut); + + CassRow const* row = cass_result_first_row(res); + if (!row) + { + BOOST_LOG_TRIVIAL(error) << "executeSyncUpdate - no rows"; + cass_result_free(res); + return false; + } + cass_bool_t success; + rc = cass_value_get_bool(cass_row_get_column(row, 0), &success); + if (rc != CASS_OK) + { + cass_result_free(res); + BOOST_LOG_TRIVIAL(error) + << "executeSyncUpdate - error getting result " << rc << ", " + << cass_error_desc(rc); + return false; + } + cass_result_free(res); + return success == cass_true; + } + + CassandraResult + executeSyncRead(CassandraStatement const& statement) const + { + CassFuture* fut; + CassError rc; + do + { + fut = cass_session_execute(session_.get(), statement.get()); + rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Cassandra executeSyncRead error"; + ss << ", retrying"; + ss << ": " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(warning) << ss.str(); + } + } while (rc != CASS_OK); + + CassResult const* res = cass_future_get_result(fut); + cass_future_free(fut); + return {res}; + } }; } // namespace Backend