cache fixes. remove defer reads flag. check latest sequence in etl

This commit is contained in:
CJ Cobb
2022-03-11 20:01:39 -05:00
parent 59a0b47df4
commit 6fe957ef8f
4 changed files with 9 additions and 16 deletions

View File

@@ -969,8 +969,8 @@ CassandraBackend::open(bool readOnly)
if (keyspace.empty()) if (keyspace.empty())
{ {
BOOST_LOG_TRIVIAL(warning) BOOST_LOG_TRIVIAL(warning)
<< "No keyspace specified. Using keyspace oceand"; << "No keyspace specified. Using keyspace clio";
keyspace = "oceand"; keyspace = "clio";
} }
int rf = getInt("replication_factor") ? *getInt("replication_factor") : 3; int rf = getInt("replication_factor") ? *getInt("replication_factor") : 3;

View File

@@ -13,7 +13,6 @@ SimpleCache::update(
uint32_t seq, uint32_t seq,
bool isBackground) bool isBackground)
{ {
deferReads_ = true;
{ {
std::unique_lock lck{mtx_}; std::unique_lock lck{mtx_};
if (seq > latestSeq_) if (seq > latestSeq_)
@@ -41,13 +40,10 @@ SimpleCache::update(
} }
} }
} }
deferReads_ = false;
} }
std::optional<LedgerObject> std::optional<LedgerObject>
SimpleCache::getSuccessor(ripple::uint256 const& key, uint32_t seq) const SimpleCache::getSuccessor(ripple::uint256 const& key, uint32_t seq) const
{ {
if (deferReads_)
return {};
if (!full_) if (!full_)
return {}; return {};
std::shared_lock{mtx_}; std::shared_lock{mtx_};
@@ -61,8 +57,6 @@ SimpleCache::getSuccessor(ripple::uint256 const& key, uint32_t seq) const
std::optional<LedgerObject> std::optional<LedgerObject>
SimpleCache::getPredecessor(ripple::uint256 const& key, uint32_t seq) const SimpleCache::getPredecessor(ripple::uint256 const& key, uint32_t seq) const
{ {
if (deferReads_)
return {};
if (!full_) if (!full_)
return {}; return {};
std::shared_lock lck{mtx_}; std::shared_lock lck{mtx_};
@@ -77,8 +71,6 @@ SimpleCache::getPredecessor(ripple::uint256 const& key, uint32_t seq) const
std::optional<Blob> std::optional<Blob>
SimpleCache::get(ripple::uint256 const& key, uint32_t seq) const SimpleCache::get(ripple::uint256 const& key, uint32_t seq) const
{ {
if (deferReads_)
return {};
if (seq > latestSeq_) if (seq > latestSeq_)
return {}; return {};
std::shared_lock lck{mtx_}; std::shared_lock lck{mtx_};

View File

@@ -19,9 +19,6 @@ class SimpleCache
}; };
std::map<ripple::uint256, CacheEntry> map_; std::map<ripple::uint256, CacheEntry> map_;
mutable std::shared_mutex mtx_; mutable std::shared_mutex mtx_;
// flag set in update to prevent reads from starving the update, and to
// prevent reads from piling up behind the update
std::atomic_bool deferReads_ = false;
uint32_t latestSeq_ = 0; uint32_t latestSeq_ = 0;
std::atomic_bool full_ = false; std::atomic_bool full_ = false;
// temporary set to prevent background thread from writing already deleted // temporary set to prevent background thread from writing already deleted

View File

@@ -365,8 +365,8 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
{ {
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " object neighbors not included. using cache"; << __func__ << " object neighbors not included. using cache";
assert(backend_->cache().isFull()); if (!backend_->cache().isFull() ||
if (!backend_->cache().isFull()) backend_->cache().latestLedgerSequence() != lgrInfo.seq - 1)
throw std::runtime_error( throw std::runtime_error(
"Cache is not full, but object neighbors were not " "Cache is not full, but object neighbors were not "
"included"); "included");
@@ -420,7 +420,11 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
{ {
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " object neighbors not included. using cache"; << __func__ << " object neighbors not included. using cache";
assert(backend_->cache().isFull()); if (!backend_->cache().isFull() ||
backend_->cache().latestLedgerSequence() != lgrInfo.seq)
throw std::runtime_error(
"Cache is not full, but object neighbors were not "
"included");
for (auto const& obj : cacheUpdates) for (auto const& obj : cacheUpdates)
{ {
if (modified.count(obj.key)) if (modified.count(obj.key))