Bug fixes

* Handle empty ranges in initial download
* Don't skip records in last marker
* Add identifier for timed out Cassandra writes
This commit is contained in:
CJ Cobb
2022-01-12 17:37:22 +00:00
parent 6e6f47421d
commit d6e8e0bcde
3 changed files with 58 additions and 22 deletions

View File

@@ -16,8 +16,8 @@ processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func)
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra write error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying in " << wait.count()
<< " milliseconds";
<< cass_error_desc(rc) << " id= " << requestParams.toString()
<< ", retrying in " << wait.count() << " milliseconds";
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
@@ -52,9 +52,14 @@ struct WriteCallbackData
std::function<void(WriteCallbackData<T, B>&, bool)> retry;
uint32_t currentRetries;
std::atomic<int> refs = 1;
std::string id;
WriteCallbackData(CassandraBackend const* b, T&& d, B bind)
: backend(b), data(std::move(d))
WriteCallbackData(
CassandraBackend const* b,
T&& d,
B bind,
std::string const& identifier)
: backend(b), data(std::move(d)), id(identifier)
{
retry = [bind, this](auto& params, bool isRetry) {
auto statement = bind(params);
@@ -83,6 +88,12 @@ struct WriteCallbackData
virtual ~WriteCallbackData()
{
}
std::string
toString()
{
return id;
}
};
template <class T, class B>
struct BulkWriteCallbackData : public WriteCallbackData<T, B>
@@ -97,7 +108,7 @@ struct BulkWriteCallbackData : public WriteCallbackData<T, B>
std::atomic_int& r,
std::mutex& m,
std::condition_variable& c)
: WriteCallbackData<T, B>(b, std::move(d), bind)
: WriteCallbackData<T, B>(b, std::move(d), bind, "bulk")
, numRemaining(r)
, mtx(m)
, cv(c)
@@ -124,9 +135,13 @@ struct BulkWriteCallbackData : public WriteCallbackData<T, B>
template <class T, class B>
void
makeAndExecuteAsyncWrite(CassandraBackend const* b, T&& d, B bind)
makeAndExecuteAsyncWrite(
CassandraBackend const* b,
T&& d,
B bind,
std::string const& id)
{
auto* cb = new WriteCallbackData(b, std::move(d), bind);
auto* cb = new WriteCallbackData(b, std::move(d), bind, id);
cb->start();
}
template <class T, class B>
@@ -153,14 +168,17 @@ CassandraBackend::doWriteLedgerObject(
BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra";
if (range)
makeAndExecuteAsyncWrite(
this, std::move(std::make_tuple(seq, key)), [this](auto& params) {
this,
std::move(std::make_tuple(seq, key)),
[this](auto& params) {
auto& [sequence, key] = params.data;
CassandraStatement statement{insertDiff_};
statement.bindNextInt(sequence);
statement.bindNextBytes(key);
return statement;
});
},
"ledger_diff");
makeAndExecuteAsyncWrite(
this,
std::move(std::make_tuple(std::move(key), seq, std::move(blob))),
@@ -172,7 +190,8 @@ CassandraBackend::doWriteLedgerObject(
statement.bindNextInt(sequence);
statement.bindNextBytes(blob);
return statement;
});
},
"ledger_object");
}
void
CassandraBackend::writeSuccessor(
@@ -183,8 +202,8 @@ CassandraBackend::writeSuccessor(
BOOST_LOG_TRIVIAL(trace)
<< "Writing successor. key = " << key
<< " seq = " << std::to_string(seq) << " successor = " << successor;
assert(key.size());
assert(successor.size());
assert(key.size() != 0);
assert(successor.size() != 0);
makeAndExecuteAsyncWrite(
this,
std::move(std::make_tuple(std::move(key), seq, std::move(successor))),
@@ -196,7 +215,8 @@ CassandraBackend::writeSuccessor(
statement.bindNextInt(sequence);
statement.bindNextBytes(successor);
return statement;
});
},
"successor");
}
void
CassandraBackend::writeLedger(
@@ -212,7 +232,8 @@ CassandraBackend::writeLedger(
statement.bindNextInt(sequence);
statement.bindNextBytes(header);
return statement;
});
},
"ledger");
makeAndExecuteAsyncWrite(
this,
std::move(std::make_tuple(ledgerInfo.hash, ledgerInfo.seq)),
@@ -222,7 +243,8 @@ CassandraBackend::writeLedger(
statement.bindNextBytes(hash);
statement.bindNextInt(sequence);
return statement;
});
},
"ledger_hash");
ledgerSequence_ = ledgerInfo.seq;
}
void
@@ -247,7 +269,8 @@ CassandraBackend::writeAccountTransactions(
statement.bindNextIntTuple(lgrSeq, txnIdx);
statement.bindNextBytes(hash);
return statement;
});
},
"account_tx");
}
}
}
@@ -263,12 +286,15 @@ CassandraBackend::writeTransaction(
std::string hashCpy = hash;
makeAndExecuteAsyncWrite(
this, std::move(std::make_pair(seq, hash)), [this](auto& params) {
this,
std::move(std::make_pair(seq, hash)),
[this](auto& params) {
CassandraStatement statement{insertLedgerTransaction_};
statement.bindNextInt(params.data.first);
statement.bindNextBytes(params.data.second);
return statement;
});
},
"ledger_transaction");
makeAndExecuteAsyncWrite(
this,
std::move(std::make_tuple(
@@ -286,7 +312,8 @@ CassandraBackend::writeTransaction(
statement.bindNextBytes(transaction);
statement.bindNextBytes(metadata);
return statement;
});
},
"transaction");
}
std::optional<LedgerRange>

View File

@@ -599,7 +599,7 @@ public:
for (int i = 0; i < cur_->ledger_objects().objects_size(); ++i)
{
auto& obj = *(cur_->mutable_ledger_objects()->mutable_objects(i));
if (!more)
if (!more && nextPrefix_ != 0x00)
{
if (((unsigned char)obj.key()[0]) >= nextPrefix_)
continue;
@@ -721,7 +721,9 @@ ETLSourceImpl<Derived>::loadInitialLedger(
BOOST_LOG_TRIVIAL(debug)
<< "Finished a marker. "
<< "Current number of finished = " << numFinished;
edgeKeys.push_back(ptr->getLastKey());
std::string lastKey = ptr->getLastKey();
if (lastKey.size())
edgeKeys.push_back(ptr->getLastKey());
}
if (result == AsyncCallData::CallStatus::ERRORED)
{

View File

@@ -349,6 +349,9 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
assert(key);
cacheUpdates.push_back(
{*key, {obj.mutable_data()->begin(), obj.mutable_data()->end()}});
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " key = " << ripple::strHex(*key)
<< " - mod type = " << obj.mod_type();
if (obj.mod_type() != org::xrpl::rpc::v1::RawLedgerObject::MODIFIED &&
!rawData.object_neighbors_included())
@@ -356,9 +359,13 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " object neighbors not included. using cache";
assert(backend_->cache().isFull());
if (!backend_->cache().isFull())
throw std::runtime_error(
"Cache is not full, but object neighbors were not "
"included");
auto blob = obj.mutable_data();
bool checkBookBase = false;
bool isDeleted = blob->size() == 0;
bool isDeleted = (blob->size() == 0);
if (isDeleted)
{
auto old = backend_->cache().get(*key, lgrInfo.seq - 1);