mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Improve online delete backend locking
This commit is contained in:
committed by
Nik Bougalis
parent
894d3463ce
commit
d88a7c73b4
@@ -426,22 +426,22 @@ SHAMapStoreImp::run()
|
||||
default:;
|
||||
}
|
||||
|
||||
std::string nextArchiveDir =
|
||||
dbRotating_->getWritableBackend()->getName();
|
||||
lastRotated_ = validatedSeq;
|
||||
std::shared_ptr<NodeStore::Backend> oldBackend;
|
||||
{
|
||||
std::lock_guard lock(dbRotating_->peekMutex());
|
||||
|
||||
state_db_.setState(SavedState{
|
||||
newBackend->getName(), nextArchiveDir, lastRotated_});
|
||||
clearCaches(validatedSeq);
|
||||
oldBackend =
|
||||
dbRotating_->rotateBackends(std::move(newBackend), lock);
|
||||
}
|
||||
dbRotating_->rotateWithLock(
|
||||
[&](std::string const& writableBackendName) {
|
||||
SavedState savedState;
|
||||
savedState.writableDb = newBackend->getName();
|
||||
savedState.archiveDb = writableBackendName;
|
||||
savedState.lastRotated = lastRotated_;
|
||||
state_db_.setState(savedState);
|
||||
|
||||
clearCaches(validatedSeq);
|
||||
|
||||
return std::move(newBackend);
|
||||
});
|
||||
|
||||
JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
|
||||
|
||||
oldBackend->setDeletePath();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,16 +47,13 @@ public:
|
||||
virtual TaggedCache<uint256, NodeObject> const&
|
||||
getPositiveCache() = 0;
|
||||
|
||||
virtual std::mutex&
|
||||
peekMutex() const = 0;
|
||||
/** Rotates the backends.
|
||||
|
||||
virtual std::shared_ptr<Backend> const&
|
||||
getWritableBackend() const = 0;
|
||||
|
||||
virtual std::shared_ptr<Backend>
|
||||
rotateBackends(
|
||||
std::shared_ptr<Backend> newBackend,
|
||||
std::lock_guard<std::mutex> const&) = 0;
|
||||
@param f A function executed before the rotation and under the same lock
|
||||
*/
|
||||
virtual void
|
||||
rotateWithLock(std::function<std::unique_ptr<NodeStore::Backend>(
|
||||
std::string const& writableBackendName)> const& f) = 0;
|
||||
};
|
||||
|
||||
} // namespace NodeStore
|
||||
|
||||
@@ -55,15 +55,54 @@ DatabaseRotatingImp::DatabaseRotatingImp(
|
||||
setParent(parent);
|
||||
}
|
||||
|
||||
std::shared_ptr<Backend>
|
||||
DatabaseRotatingImp::rotateBackends(
|
||||
std::shared_ptr<Backend> newBackend,
|
||||
std::lock_guard<std::mutex> const&)
|
||||
void
|
||||
DatabaseRotatingImp::rotateWithLock(
|
||||
std::function<std::unique_ptr<NodeStore::Backend>(
|
||||
std::string const& writableBackendName)> const& f)
|
||||
{
|
||||
auto oldBackend{std::move(archiveBackend_)};
|
||||
std::lock_guard lock(mutex_);
|
||||
|
||||
auto newBackend = f(writableBackend_->getName());
|
||||
archiveBackend_->setDeletePath();
|
||||
archiveBackend_ = std::move(writableBackend_);
|
||||
writableBackend_ = std::move(newBackend);
|
||||
return oldBackend;
|
||||
}
|
||||
|
||||
std::string
|
||||
DatabaseRotatingImp::getName() const
|
||||
{
|
||||
std::lock_guard lock(mutex_);
|
||||
return writableBackend_->getName();
|
||||
}
|
||||
|
||||
std::int32_t
|
||||
DatabaseRotatingImp::getWriteLoad() const
|
||||
{
|
||||
std::lock_guard lock(mutex_);
|
||||
return writableBackend_->getWriteLoad();
|
||||
}
|
||||
|
||||
void
|
||||
DatabaseRotatingImp::import(Database& source)
|
||||
{
|
||||
auto const backend = [&] {
|
||||
std::lock_guard lock(mutex_);
|
||||
return writableBackend_;
|
||||
}();
|
||||
|
||||
importInternal(*backend, source);
|
||||
}
|
||||
|
||||
bool
|
||||
DatabaseRotatingImp::storeLedger(std::shared_ptr<Ledger const> const& srcLedger)
|
||||
{
|
||||
auto const backend = [&] {
|
||||
std::lock_guard lock(mutex_);
|
||||
return writableBackend_;
|
||||
}();
|
||||
|
||||
return Database::storeLedger(
|
||||
*srcLedger, backend, pCache_, nCache_, nullptr);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -75,7 +114,13 @@ DatabaseRotatingImp::store(
|
||||
{
|
||||
auto nObj = NodeObject::createObject(type, std::move(data), hash);
|
||||
pCache_->canonicalize_replace_cache(hash, nObj);
|
||||
getWritableBackend()->store(nObj);
|
||||
|
||||
auto const backend = [&] {
|
||||
std::lock_guard lock(mutex_);
|
||||
return writableBackend_;
|
||||
}();
|
||||
backend->store(nObj);
|
||||
|
||||
nCache_->erase(hash);
|
||||
storeStats(nObj->getData().size());
|
||||
}
|
||||
@@ -90,6 +135,7 @@ DatabaseRotatingImp::asyncFetch(
|
||||
object = pCache_->fetch(hash);
|
||||
if (object || nCache_->touch_if_exists(hash))
|
||||
return true;
|
||||
|
||||
// Otherwise post a read
|
||||
Database::asyncFetch(hash, seq, pCache_, nCache_);
|
||||
return false;
|
||||
@@ -114,19 +160,48 @@ DatabaseRotatingImp::sweep()
|
||||
std::shared_ptr<NodeObject>
|
||||
DatabaseRotatingImp::fetchFrom(uint256 const& hash, std::uint32_t seq)
|
||||
{
|
||||
Backends b = getBackends();
|
||||
auto nObj = fetchInternal(hash, b.writableBackend);
|
||||
auto [writable, archive] = [&] {
|
||||
std::lock_guard lock(mutex_);
|
||||
return std::make_pair(writableBackend_, archiveBackend_);
|
||||
}();
|
||||
|
||||
// Try to fetch from the writable backend
|
||||
auto nObj = fetchInternal(hash, writable);
|
||||
if (!nObj)
|
||||
{
|
||||
nObj = fetchInternal(hash, b.archiveBackend);
|
||||
// Otherwise try to fetch from the archive backend
|
||||
nObj = fetchInternal(hash, archive);
|
||||
if (nObj)
|
||||
{
|
||||
getWritableBackend()->store(nObj);
|
||||
{
|
||||
// Refresh the writable backend pointer
|
||||
std::lock_guard lock(mutex_);
|
||||
writable = writableBackend_;
|
||||
}
|
||||
|
||||
// Update writable backend with data from the archive backend
|
||||
writable->store(nObj);
|
||||
nCache_->erase(hash);
|
||||
}
|
||||
}
|
||||
return nObj;
|
||||
}
|
||||
|
||||
void
|
||||
DatabaseRotatingImp::for_each(
|
||||
std::function<void(std::shared_ptr<NodeObject>)> f)
|
||||
{
|
||||
auto [writable, archive] = [&] {
|
||||
std::lock_guard lock(mutex_);
|
||||
return std::make_pair(writableBackend_, archiveBackend_);
|
||||
}();
|
||||
|
||||
// Iterate the writable backend
|
||||
writable->for_each(f);
|
||||
|
||||
// Iterate the archive backend
|
||||
archive->for_each(f);
|
||||
}
|
||||
|
||||
} // namespace NodeStore
|
||||
} // namespace ripple
|
||||
|
||||
@@ -49,41 +49,19 @@ public:
|
||||
stopThreads();
|
||||
}
|
||||
|
||||
std::shared_ptr<Backend> const&
|
||||
getWritableBackend() const override
|
||||
{
|
||||
std::lock_guard lock(rotateMutex_);
|
||||
return writableBackend_;
|
||||
}
|
||||
|
||||
std::shared_ptr<Backend>
|
||||
rotateBackends(
|
||||
std::shared_ptr<Backend> newBackend,
|
||||
std::lock_guard<std::mutex> const&) override;
|
||||
|
||||
std::mutex&
|
||||
peekMutex() const override
|
||||
{
|
||||
return rotateMutex_;
|
||||
}
|
||||
void
|
||||
rotateWithLock(
|
||||
std::function<std::unique_ptr<NodeStore::Backend>(
|
||||
std::string const& writableBackendName)> const& f) override;
|
||||
|
||||
std::string
|
||||
getName() const override
|
||||
{
|
||||
return getWritableBackend()->getName();
|
||||
}
|
||||
getName() const override;
|
||||
|
||||
std::int32_t
|
||||
getWriteLoad() const override
|
||||
{
|
||||
return getWritableBackend()->getWriteLoad();
|
||||
}
|
||||
getWriteLoad() const override;
|
||||
|
||||
void
|
||||
import(Database& source) override
|
||||
{
|
||||
importInternal(*getWritableBackend(), source);
|
||||
}
|
||||
import(Database& source) override;
|
||||
|
||||
void
|
||||
store(
|
||||
@@ -105,11 +83,7 @@ public:
|
||||
std::shared_ptr<NodeObject>& object) override;
|
||||
|
||||
bool
|
||||
storeLedger(std::shared_ptr<Ledger const> const& srcLedger) override
|
||||
{
|
||||
return Database::storeLedger(
|
||||
*srcLedger, getWritableBackend(), pCache_, nCache_, nullptr);
|
||||
}
|
||||
storeLedger(std::shared_ptr<Ledger const> const& srcLedger) override;
|
||||
|
||||
int
|
||||
getDesiredAsyncReadCount(std::uint32_t seq) override
|
||||
@@ -147,31 +121,13 @@ private:
|
||||
|
||||
std::shared_ptr<Backend> writableBackend_;
|
||||
std::shared_ptr<Backend> archiveBackend_;
|
||||
mutable std::mutex rotateMutex_;
|
||||
|
||||
struct Backends
|
||||
{
|
||||
std::shared_ptr<Backend> const& writableBackend;
|
||||
std::shared_ptr<Backend> const& archiveBackend;
|
||||
};
|
||||
|
||||
Backends
|
||||
getBackends() const
|
||||
{
|
||||
std::lock_guard lock(rotateMutex_);
|
||||
return Backends{writableBackend_, archiveBackend_};
|
||||
}
|
||||
mutable std::mutex mutex_;
|
||||
|
||||
std::shared_ptr<NodeObject>
|
||||
fetchFrom(uint256 const& hash, std::uint32_t seq) override;
|
||||
|
||||
void
|
||||
for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override
|
||||
{
|
||||
Backends b = getBackends();
|
||||
b.archiveBackend->for_each(f);
|
||||
b.writableBackend->for_each(f);
|
||||
}
|
||||
for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override;
|
||||
};
|
||||
|
||||
} // namespace NodeStore
|
||||
|
||||
Reference in New Issue
Block a user