From d6e8e0bcdec80228205ab70cb6ea85d40ece195c Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Wed, 12 Jan 2022 17:37:22 +0000 Subject: [PATCH] Bug fixes * Handle empty ranges in initial download * Don't skip records in last marker * Add identifier for timed out Cassandra writes --- src/backend/CassandraBackend.cpp | 65 ++++++++++++++++++++++---------- src/etl/ETLSource.cpp | 6 ++- src/etl/ReportingETL.cpp | 9 ++++- 3 files changed, 58 insertions(+), 22 deletions(-) diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index 4b654b58f..b13ccf1ad 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -16,8 +16,8 @@ processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func) lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); BOOST_LOG_TRIVIAL(error) << "ERROR!!! Cassandra write error: " << rc << ", " - << cass_error_desc(rc) << ", retrying in " << wait.count() - << " milliseconds"; + << cass_error_desc(rc) << " id= " << requestParams.toString() + << ", retrying in " << wait.count() << " milliseconds"; ++requestParams.currentRetries; std::shared_ptr timer = std::make_shared( @@ -52,9 +52,14 @@ struct WriteCallbackData std::function&, bool)> retry; uint32_t currentRetries; std::atomic refs = 1; + std::string id; - WriteCallbackData(CassandraBackend const* b, T&& d, B bind) - : backend(b), data(std::move(d)) + WriteCallbackData( + CassandraBackend const* b, + T&& d, + B bind, + std::string const& identifier) + : backend(b), data(std::move(d)), id(identifier) { retry = [bind, this](auto& params, bool isRetry) { auto statement = bind(params); @@ -83,6 +88,12 @@ struct WriteCallbackData virtual ~WriteCallbackData() { } + + std::string + toString() + { + return id; + } }; template struct BulkWriteCallbackData : public WriteCallbackData @@ -97,7 +108,7 @@ struct BulkWriteCallbackData : public WriteCallbackData std::atomic_int& r, std::mutex& m, std::condition_variable& c) - : WriteCallbackData(b, std::move(d), bind) + : WriteCallbackData(b, std::move(d), bind, "bulk") , numRemaining(r) , mtx(m) , cv(c) @@ -124,9 +135,13 @@ struct BulkWriteCallbackData : public WriteCallbackData template void -makeAndExecuteAsyncWrite(CassandraBackend const* b, T&& d, B bind) +makeAndExecuteAsyncWrite( + CassandraBackend const* b, + T&& d, + B bind, + std::string const& id) { - auto* cb = new WriteCallbackData(b, std::move(d), bind); + auto* cb = new WriteCallbackData(b, std::move(d), bind, id); cb->start(); } template @@ -153,14 +168,17 @@ CassandraBackend::doWriteLedgerObject( BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra"; if (range) makeAndExecuteAsyncWrite( - this, std::move(std::make_tuple(seq, key)), [this](auto& params) { + this, + std::move(std::make_tuple(seq, key)), + [this](auto& params) { auto& [sequence, key] = params.data; CassandraStatement statement{insertDiff_}; statement.bindNextInt(sequence); statement.bindNextBytes(key); return statement; - }); + }, + "ledger_diff"); makeAndExecuteAsyncWrite( this, std::move(std::make_tuple(std::move(key), seq, std::move(blob))), @@ -172,7 +190,8 @@ CassandraBackend::doWriteLedgerObject( statement.bindNextInt(sequence); statement.bindNextBytes(blob); return statement; - }); + }, + "ledger_object"); } void CassandraBackend::writeSuccessor( @@ -183,8 +202,8 @@ CassandraBackend::writeSuccessor( BOOST_LOG_TRIVIAL(trace) << "Writing successor. key = " << key << " seq = " << std::to_string(seq) << " successor = " << successor; - assert(key.size()); - assert(successor.size()); + assert(key.size() != 0); + assert(successor.size() != 0); makeAndExecuteAsyncWrite( this, std::move(std::make_tuple(std::move(key), seq, std::move(successor))), @@ -196,7 +215,8 @@ CassandraBackend::writeSuccessor( statement.bindNextInt(sequence); statement.bindNextBytes(successor); return statement; - }); + }, + "successor"); } void CassandraBackend::writeLedger( @@ -212,7 +232,8 @@ CassandraBackend::writeLedger( statement.bindNextInt(sequence); statement.bindNextBytes(header); return statement; - }); + }, + "ledger"); makeAndExecuteAsyncWrite( this, std::move(std::make_tuple(ledgerInfo.hash, ledgerInfo.seq)), @@ -222,7 +243,8 @@ CassandraBackend::writeLedger( statement.bindNextBytes(hash); statement.bindNextInt(sequence); return statement; - }); + }, + "ledger_hash"); ledgerSequence_ = ledgerInfo.seq; } void @@ -247,7 +269,8 @@ CassandraBackend::writeAccountTransactions( statement.bindNextIntTuple(lgrSeq, txnIdx); statement.bindNextBytes(hash); return statement; - }); + }, + "account_tx"); } } } @@ -263,12 +286,15 @@ CassandraBackend::writeTransaction( std::string hashCpy = hash; makeAndExecuteAsyncWrite( - this, std::move(std::make_pair(seq, hash)), [this](auto& params) { + this, + std::move(std::make_pair(seq, hash)), + [this](auto& params) { CassandraStatement statement{insertLedgerTransaction_}; statement.bindNextInt(params.data.first); statement.bindNextBytes(params.data.second); return statement; - }); + }, + "ledger_transaction"); makeAndExecuteAsyncWrite( this, std::move(std::make_tuple( @@ -286,7 +312,8 @@ CassandraBackend::writeTransaction( statement.bindNextBytes(transaction); statement.bindNextBytes(metadata); return statement; - }); + }, + "transaction"); } std::optional diff --git a/src/etl/ETLSource.cpp b/src/etl/ETLSource.cpp index 864ce15d1..9029ad85f 100644 --- a/src/etl/ETLSource.cpp +++ b/src/etl/ETLSource.cpp @@ -599,7 +599,7 @@ public: for (int i = 0; i < cur_->ledger_objects().objects_size(); ++i) { auto& obj = *(cur_->mutable_ledger_objects()->mutable_objects(i)); - if (!more) + if (!more && nextPrefix_ != 0x00) { if (((unsigned char)obj.key()[0]) >= nextPrefix_) continue; @@ -721,7 +721,9 @@ ETLSourceImpl::loadInitialLedger( BOOST_LOG_TRIVIAL(debug) << "Finished a marker. " << "Current number of finished = " << numFinished; - edgeKeys.push_back(ptr->getLastKey()); + std::string lastKey = ptr->getLastKey(); + if (lastKey.size()) + edgeKeys.push_back(ptr->getLastKey()); } if (result == AsyncCallData::CallStatus::ERRORED) { diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 09f01a492..4fcc0b7a0 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -349,6 +349,9 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) assert(key); cacheUpdates.push_back( {*key, {obj.mutable_data()->begin(), obj.mutable_data()->end()}}); + BOOST_LOG_TRIVIAL(debug) + << __func__ << " key = " << ripple::strHex(*key) + << " - mod type = " << obj.mod_type(); if (obj.mod_type() != org::xrpl::rpc::v1::RawLedgerObject::MODIFIED && !rawData.object_neighbors_included()) @@ -356,9 +359,13 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) BOOST_LOG_TRIVIAL(debug) << __func__ << " object neighbors not included. using cache"; assert(backend_->cache().isFull()); + if (!backend_->cache().isFull()) + throw std::runtime_error( + "Cache is not full, but object neighbors were not " + "included"); auto blob = obj.mutable_data(); bool checkBookBase = false; - bool isDeleted = blob->size() == 0; + bool isDeleted = (blob->size() == 0); if (isDeleted) { auto old = backend_->cache().get(*key, lgrInfo.seq - 1);