mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-27 07:05:54 +00:00
Fix ETL race condition problem (#1132)
Wait for previous publish being finished to switch to writer.
This commit is contained in:
@@ -40,6 +40,16 @@ LedgerCache::latestLedgerSequence() const
|
|||||||
return latestSeq_;
|
return latestSeq_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
LedgerCache::waitUntilCacheContainsSeq(uint32_t seq)
|
||||||
|
{
|
||||||
|
if (disabled_)
|
||||||
|
return;
|
||||||
|
std::unique_lock lock(mtx_);
|
||||||
|
cv_.wait(lock, [this, seq] { return latestSeq_ >= seq; });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
LedgerCache::update(std::vector<LedgerObject> const& objs, uint32_t seq, bool isBackground)
|
LedgerCache::update(std::vector<LedgerObject> const& objs, uint32_t seq, bool isBackground)
|
||||||
{
|
{
|
||||||
@@ -72,6 +82,7 @@ LedgerCache::update(std::vector<LedgerObject> const& objs, uint32_t seq, bool is
|
|||||||
deletes_.insert(obj.key);
|
deletes_.insert(obj.key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
cv_.notify_all();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,7 @@
|
|||||||
#include <ripple/basics/base_uint.h>
|
#include <ripple/basics/base_uint.h>
|
||||||
#include <ripple/basics/hardened_hash.h>
|
#include <ripple/basics/hardened_hash.h>
|
||||||
|
|
||||||
|
#include <condition_variable>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <shared_mutex>
|
#include <shared_mutex>
|
||||||
@@ -67,6 +68,7 @@ class LedgerCache {
|
|||||||
std::map<ripple::uint256, CacheEntry> map_;
|
std::map<ripple::uint256, CacheEntry> map_;
|
||||||
|
|
||||||
mutable std::shared_mutex mtx_;
|
mutable std::shared_mutex mtx_;
|
||||||
|
std::condition_variable_any cv_;
|
||||||
uint32_t latestSeq_ = 0;
|
uint32_t latestSeq_ = 0;
|
||||||
std::atomic_bool full_ = false;
|
std::atomic_bool full_ = false;
|
||||||
std::atomic_bool disabled_ = false;
|
std::atomic_bool disabled_ = false;
|
||||||
@@ -164,6 +166,9 @@ public:
|
|||||||
*/
|
*/
|
||||||
float
|
float
|
||||||
getSuccessorHitRate() const;
|
getSuccessorHitRate() const;
|
||||||
|
|
||||||
|
void
|
||||||
|
waitUntilCacheContainsSeq(uint32_t seq);
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace data
|
} // namespace data
|
||||||
|
|||||||
@@ -47,6 +47,10 @@ ETLService::runETLPipeline(uint32_t startSequence, uint32_t numExtractors)
|
|||||||
if (finishSequence_ && startSequence > *finishSequence_)
|
if (finishSequence_ && startSequence > *finishSequence_)
|
||||||
return {};
|
return {};
|
||||||
|
|
||||||
|
LOG(log_.debug()) << "Wait for cache containing seq " << startSequence - 1
|
||||||
|
<< " current cache last seq =" << backend_->cache().latestLedgerSequence();
|
||||||
|
backend_->cache().waitUntilCacheContainsSeq(startSequence - 1);
|
||||||
|
|
||||||
LOG(log_.debug()) << "Starting etl pipeline";
|
LOG(log_.debug()) << "Starting etl pipeline";
|
||||||
state_.isWriting = true;
|
state_.isWriting = true;
|
||||||
|
|
||||||
|
|||||||
@@ -89,9 +89,15 @@ public:
|
|||||||
{
|
{
|
||||||
LOG(log_.debug()) << "Attempting to fetch ledger with sequence = " << sequence;
|
LOG(log_.debug()) << "Attempting to fetch ledger with sequence = " << sequence;
|
||||||
|
|
||||||
auto response = loadBalancer_->fetchLedger(
|
auto const isCacheFull = backend_->cache().isFull();
|
||||||
sequence, true, !backend_->cache().isFull() || backend_->cache().latestLedgerSequence() >= sequence
|
auto const isLedgerCached = backend_->cache().latestLedgerSequence() >= sequence;
|
||||||
);
|
if (isLedgerCached) {
|
||||||
|
LOG(log_.info()) << sequence << " is already cached, the current latest seq in cache is "
|
||||||
|
<< backend_->cache().latestLedgerSequence() << " and the cache is "
|
||||||
|
<< (isCacheFull ? "full" : "not full");
|
||||||
|
}
|
||||||
|
|
||||||
|
auto response = loadBalancer_->fetchLedger(sequence, true, !isCacheFull || isLedgerCached);
|
||||||
if (response)
|
if (response)
|
||||||
LOG(log_.trace()) << "GetLedger reply = " << response->DebugString();
|
LOG(log_.trace()) << "GetLedger reply = " << response->DebugString();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user