diff --git a/src/data/LedgerCache.cpp b/src/data/LedgerCache.cpp index c2137ac3..9a07a1ae 100644 --- a/src/data/LedgerCache.cpp +++ b/src/data/LedgerCache.cpp @@ -40,6 +40,16 @@ LedgerCache::latestLedgerSequence() const return latestSeq_; } +void +LedgerCache::waitUntilCacheContainsSeq(uint32_t seq) +{ + if (disabled_) + return; + std::unique_lock lock(mtx_); + cv_.wait(lock, [this, seq] { return latestSeq_ >= seq; }); + return; +} + void LedgerCache::update(std::vector const& objs, uint32_t seq, bool isBackground) { @@ -72,6 +82,7 @@ LedgerCache::update(std::vector const& objs, uint32_t seq, bool is deletes_.insert(obj.key); } } + cv_.notify_all(); } } diff --git a/src/data/LedgerCache.h b/src/data/LedgerCache.h index 22be53d3..6188a468 100644 --- a/src/data/LedgerCache.h +++ b/src/data/LedgerCache.h @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -67,6 +68,7 @@ class LedgerCache { std::map map_; mutable std::shared_mutex mtx_; + std::condition_variable_any cv_; uint32_t latestSeq_ = 0; std::atomic_bool full_ = false; std::atomic_bool disabled_ = false; @@ -164,6 +166,9 @@ public: */ float getSuccessorHitRate() const; + + void + waitUntilCacheContainsSeq(uint32_t seq); }; } // namespace data diff --git a/src/etl/ETLService.cpp b/src/etl/ETLService.cpp index 2464e730..c5da9ec5 100644 --- a/src/etl/ETLService.cpp +++ b/src/etl/ETLService.cpp @@ -47,6 +47,10 @@ ETLService::runETLPipeline(uint32_t startSequence, uint32_t numExtractors) if (finishSequence_ && startSequence > *finishSequence_) return {}; + LOG(log_.debug()) << "Wait for cache containing seq " << startSequence - 1 + << " current cache last seq =" << backend_->cache().latestLedgerSequence(); + backend_->cache().waitUntilCacheContainsSeq(startSequence - 1); + LOG(log_.debug()) << "Starting etl pipeline"; state_.isWriting = true; diff --git a/src/etl/impl/LedgerFetcher.h b/src/etl/impl/LedgerFetcher.h index 1e6bfd26..4f8e58bd 100644 --- a/src/etl/impl/LedgerFetcher.h +++ b/src/etl/impl/LedgerFetcher.h @@ -89,9 +89,15 @@ public: { LOG(log_.debug()) << "Attempting to fetch ledger with sequence = " << sequence; - auto response = loadBalancer_->fetchLedger( - sequence, true, !backend_->cache().isFull() || backend_->cache().latestLedgerSequence() >= sequence - ); + auto const isCacheFull = backend_->cache().isFull(); + auto const isLedgerCached = backend_->cache().latestLedgerSequence() >= sequence; + if (isLedgerCached) { + LOG(log_.info()) << sequence << " is already cached, the current latest seq in cache is " + << backend_->cache().latestLedgerSequence() << " and the cache is " + << (isCacheFull ? "full" : "not full"); + } + + auto response = loadBalancer_->fetchLedger(sequence, true, !isCacheFull || isLedgerCached); if (response) LOG(log_.trace()) << "GetLedger reply = " << response->DebugString();