mirror of
https://github.com/Xahau/xahaud.git
synced 2025-12-06 17:27:52 +00:00
refactor: Change recursive_mutex to mutex in DatabaseRotatingImp (#5276)
Rewrites the code so that the lock is not held during the callback. Instead it locks twice, once before, and once after. This is safe due to the structure of the code, but is checked after the second lock. This allows mutex_ to be changed back to a regular mutex.
This commit is contained in:
@@ -18,6 +18,7 @@ test.app > xrpl.basics
|
||||
test.app > xrpld.app
|
||||
test.app > xrpld.core
|
||||
test.app > xrpld.ledger
|
||||
test.app > xrpld.nodestore
|
||||
test.app > xrpld.overlay
|
||||
test.app > xrpld.rpc
|
||||
test.app > xrpl.hook
|
||||
|
||||
@@ -20,9 +20,11 @@
|
||||
#include <test/jtx.h>
|
||||
#include <test/jtx/envconfig.h>
|
||||
#include <xrpld/app/main/Application.h>
|
||||
#include <xrpld/app/main/NodeStoreScheduler.h>
|
||||
#include <xrpld/app/misc/SHAMapStore.h>
|
||||
#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
|
||||
#include <xrpld/core/ConfigSections.h>
|
||||
#include <xrpld/nodestore/detail/DatabaseRotatingImp.h>
|
||||
#include <xrpl/protocol/jss.h>
|
||||
|
||||
namespace ripple {
|
||||
@@ -518,12 +520,137 @@ public:
|
||||
lastRotated = ledgerSeq - 1;
|
||||
}
|
||||
|
||||
std::unique_ptr<NodeStore::Backend>
|
||||
makeBackendRotating(
|
||||
jtx::Env& env,
|
||||
NodeStoreScheduler& scheduler,
|
||||
std::string path)
|
||||
{
|
||||
Section section{
|
||||
env.app().config().section(ConfigSection::nodeDatabase())};
|
||||
boost::filesystem::path newPath;
|
||||
|
||||
if (!BEAST_EXPECT(path.size()))
|
||||
return {};
|
||||
newPath = path;
|
||||
section.set("path", newPath.string());
|
||||
|
||||
auto backend{NodeStore::Manager::instance().make_Backend(
|
||||
section,
|
||||
megabytes(env.app().config().getValueFor(
|
||||
SizedItem::burstSize, std::nullopt)),
|
||||
scheduler,
|
||||
env.app().logs().journal("NodeStoreTest"))};
|
||||
backend->open();
|
||||
return backend;
|
||||
}
|
||||
|
||||
void
|
||||
testRotate()
|
||||
{
|
||||
// The only purpose of this test is to ensure that if something that
|
||||
// should never happen happens, we don't get a deadlock.
|
||||
testcase("rotate with lock contention");
|
||||
|
||||
using namespace jtx;
|
||||
Env env(*this, envconfig(onlineDelete));
|
||||
|
||||
/////////////////////////////////////////////////////////////
|
||||
// Create the backend. Normally, SHAMapStoreImp handles all these
|
||||
// details
|
||||
auto nscfg = env.app().config().section(ConfigSection::nodeDatabase());
|
||||
|
||||
// Provide default values:
|
||||
if (!nscfg.exists("cache_size"))
|
||||
nscfg.set(
|
||||
"cache_size",
|
||||
std::to_string(env.app().config().getValueFor(
|
||||
SizedItem::treeCacheSize, std::nullopt)));
|
||||
|
||||
if (!nscfg.exists("cache_age"))
|
||||
nscfg.set(
|
||||
"cache_age",
|
||||
std::to_string(env.app().config().getValueFor(
|
||||
SizedItem::treeCacheAge, std::nullopt)));
|
||||
|
||||
NodeStoreScheduler scheduler(env.app().getJobQueue());
|
||||
|
||||
std::string const writableDb = "write";
|
||||
std::string const archiveDb = "archive";
|
||||
auto writableBackend = makeBackendRotating(env, scheduler, writableDb);
|
||||
auto archiveBackend = makeBackendRotating(env, scheduler, archiveDb);
|
||||
|
||||
// Create NodeStore with two backends to allow online deletion of
|
||||
// data
|
||||
constexpr int readThreads = 4;
|
||||
auto dbr = std::make_unique<NodeStore::DatabaseRotatingImp>(
|
||||
env.app(),
|
||||
scheduler,
|
||||
readThreads,
|
||||
std::move(writableBackend),
|
||||
std::move(archiveBackend),
|
||||
nscfg,
|
||||
env.app().logs().journal("NodeStoreTest"));
|
||||
|
||||
/////////////////////////////////////////////////////////////
|
||||
// Check basic functionality
|
||||
using namespace std::chrono_literals;
|
||||
std::atomic<int> threadNum = 0;
|
||||
|
||||
{
|
||||
auto newBackend = makeBackendRotating(
|
||||
env, scheduler, std::to_string(++threadNum));
|
||||
|
||||
auto const cb = [&](std::string const& writableName,
|
||||
std::string const& archiveName) {
|
||||
BEAST_EXPECT(writableName == "1");
|
||||
BEAST_EXPECT(archiveName == "write");
|
||||
// Ensure that dbr functions can be called from within the
|
||||
// callback
|
||||
BEAST_EXPECT(dbr->getName() == "1");
|
||||
};
|
||||
|
||||
dbr->rotate(std::move(newBackend), cb);
|
||||
}
|
||||
BEAST_EXPECT(threadNum == 1);
|
||||
BEAST_EXPECT(dbr->getName() == "1");
|
||||
|
||||
/////////////////////////////////////////////////////////////
|
||||
// Do something stupid. Try to re-enter rotate from inside the callback.
|
||||
{
|
||||
auto const cb = [&](std::string const& writableName,
|
||||
std::string const& archiveName) {
|
||||
BEAST_EXPECT(writableName == "3");
|
||||
BEAST_EXPECT(archiveName == "2");
|
||||
// Ensure that dbr functions can be called from within the
|
||||
// callback
|
||||
BEAST_EXPECT(dbr->getName() == "3");
|
||||
};
|
||||
auto const cbReentrant = [&](std::string const& writableName,
|
||||
std::string const& archiveName) {
|
||||
BEAST_EXPECT(writableName == "2");
|
||||
BEAST_EXPECT(archiveName == "1");
|
||||
auto newBackend = makeBackendRotating(
|
||||
env, scheduler, std::to_string(++threadNum));
|
||||
// Reminder: doing this is stupid and should never happen
|
||||
dbr->rotate(std::move(newBackend), cb);
|
||||
};
|
||||
auto newBackend = makeBackendRotating(
|
||||
env, scheduler, std::to_string(++threadNum));
|
||||
dbr->rotate(std::move(newBackend), cbReentrant);
|
||||
}
|
||||
|
||||
BEAST_EXPECT(threadNum == 3);
|
||||
BEAST_EXPECT(dbr->getName() == "3");
|
||||
}
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
testClear();
|
||||
testAutomatic();
|
||||
testCanDelete();
|
||||
testRotate();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -369,17 +369,17 @@ SHAMapStoreImp::run()
|
||||
|
||||
lastRotated = validatedSeq;
|
||||
|
||||
dbRotating_->rotateWithLock(
|
||||
[&](std::string const& writableBackendName) {
|
||||
dbRotating_->rotate(
|
||||
std::move(newBackend),
|
||||
[&](std::string const& writableName,
|
||||
std::string const& archiveName) {
|
||||
SavedState savedState;
|
||||
savedState.writableDb = newBackend->getName();
|
||||
savedState.archiveDb = writableBackendName;
|
||||
savedState.writableDb = writableName;
|
||||
savedState.archiveDb = archiveName;
|
||||
savedState.lastRotated = lastRotated;
|
||||
state_db_.setState(savedState);
|
||||
|
||||
clearCaches(validatedSeq);
|
||||
|
||||
return std::move(newBackend);
|
||||
});
|
||||
|
||||
JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
|
||||
|
||||
@@ -44,11 +44,17 @@ public:
|
||||
|
||||
/** Rotates the backends.
|
||||
|
||||
@param f A function executed before the rotation and under the same lock
|
||||
@param newBackend New writable backend
|
||||
@param f A function executed after the rotation outside of lock. The
|
||||
values passed to f will be the new backend database names _after_
|
||||
rotation.
|
||||
*/
|
||||
virtual void
|
||||
rotateWithLock(std::function<std::unique_ptr<NodeStore::Backend>(
|
||||
std::string const& writableBackendName)> const& f) = 0;
|
||||
rotate(
|
||||
std::unique_ptr<NodeStore::Backend>&& newBackend,
|
||||
std::function<void(
|
||||
std::string const& writableName,
|
||||
std::string const& archiveName)> const& f) = 0;
|
||||
};
|
||||
|
||||
} // namespace NodeStore
|
||||
|
||||
@@ -45,67 +45,84 @@ DatabaseRotatingImp::DatabaseRotatingImp(
|
||||
}
|
||||
|
||||
void
|
||||
DatabaseRotatingImp::rotateWithLock(
|
||||
std::function<std::unique_ptr<NodeStore::Backend>(
|
||||
std::string const& writableBackendName)> const& f)
|
||||
DatabaseRotatingImp::rotate(
|
||||
std::unique_ptr<NodeStore::Backend>&& newBackend,
|
||||
std::function<void(
|
||||
std::string const& writableName,
|
||||
std::string const& archiveName)> const& f)
|
||||
{
|
||||
std::lock_guard lock(mutex_);
|
||||
// Pass these two names to the callback function
|
||||
std::string const newWritableBackendName = newBackend->getName();
|
||||
std::string newArchiveBackendName;
|
||||
// Hold on to current archive backend pointer until after the
|
||||
// callback finishes. Only then will the archive directory be
|
||||
// deleted.
|
||||
std::shared_ptr<NodeStore::Backend> oldArchiveBackend;
|
||||
{
|
||||
std::lock_guard lock(mutex_);
|
||||
|
||||
// Create the new backend
|
||||
auto newBackend = f(writableBackend_->getName());
|
||||
// Before rotating, ensure all pinned ledgers are in the writable
|
||||
// backend
|
||||
JLOG(j_.info())
|
||||
<< "Ensuring pinned ledgers are preserved before backend rotation";
|
||||
|
||||
// Before rotating, ensure all pinned ledgers are in the writable backend
|
||||
JLOG(j_.info())
|
||||
<< "Ensuring pinned ledgers are preserved before backend rotation";
|
||||
// Use a lambda to handle the preservation of pinned ledgers
|
||||
auto ensurePinnedLedgersInWritable = [this]() {
|
||||
// Get list of pinned ledgers
|
||||
auto pinnedLedgers =
|
||||
app_.getLedgerMaster().getPinnedLedgersRangeSet();
|
||||
|
||||
// Use a lambda to handle the preservation of pinned ledgers
|
||||
auto ensurePinnedLedgersInWritable = [this]() {
|
||||
// Get list of pinned ledgers
|
||||
auto pinnedLedgers = app_.getLedgerMaster().getPinnedLedgersRangeSet();
|
||||
|
||||
for (auto const& range : pinnedLedgers)
|
||||
{
|
||||
for (auto seq = range.lower(); seq <= range.upper(); ++seq)
|
||||
for (auto const& range : pinnedLedgers)
|
||||
{
|
||||
uint256 hash = app_.getLedgerMaster().getHashBySeq(seq);
|
||||
if (hash.isZero())
|
||||
continue;
|
||||
for (auto seq = range.lower(); seq <= range.upper(); ++seq)
|
||||
{
|
||||
uint256 hash = app_.getLedgerMaster().getHashBySeq(seq);
|
||||
if (hash.isZero())
|
||||
continue;
|
||||
|
||||
// Try to load the ledger
|
||||
auto ledger = app_.getLedgerMaster().getLedgerByHash(hash);
|
||||
if (ledger && ledger->isImmutable())
|
||||
{
|
||||
// If we have the ledger, store it in the writable backend
|
||||
JLOG(j_.debug()) << "Ensuring pinned ledger " << seq
|
||||
<< " is in writable backend";
|
||||
// TQ: TODO: check this
|
||||
Database::storeLedger(*ledger, writableBackend_);
|
||||
}
|
||||
else
|
||||
{
|
||||
// If we don't have the ledger in memory, try to fetch its
|
||||
// objects directly
|
||||
JLOG(j_.debug()) << "Attempting to copy pinned ledger "
|
||||
<< seq << " header to writable backend";
|
||||
std::shared_ptr<NodeObject> headerObj;
|
||||
Status status =
|
||||
archiveBackend_->fetch(hash.data(), &headerObj);
|
||||
if (status == ok && headerObj)
|
||||
writableBackend_->store(headerObj);
|
||||
// Try to load the ledger
|
||||
auto ledger = app_.getLedgerMaster().getLedgerByHash(hash);
|
||||
if (ledger && ledger->isImmutable())
|
||||
{
|
||||
// If we have the ledger, store it in the writable
|
||||
// backend
|
||||
JLOG(j_.debug()) << "Ensuring pinned ledger " << seq
|
||||
<< " is in writable backend";
|
||||
// TQ: TODO: check this
|
||||
Database::storeLedger(*ledger, writableBackend_);
|
||||
}
|
||||
else
|
||||
{
|
||||
// If we don't have the ledger in memory, try to fetch
|
||||
// its objects directly
|
||||
JLOG(j_.debug())
|
||||
<< "Attempting to copy pinned ledger " << seq
|
||||
<< " header to writable backend";
|
||||
std::shared_ptr<NodeObject> headerObj;
|
||||
Status status =
|
||||
archiveBackend_->fetch(hash.data(), &headerObj);
|
||||
if (status == ok && headerObj)
|
||||
writableBackend_->store(headerObj);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
// Execute the lambda
|
||||
ensurePinnedLedgersInWritable();
|
||||
// Execute the lambda
|
||||
ensurePinnedLedgersInWritable();
|
||||
|
||||
// Now it's safe to mark the archive backend for deletion
|
||||
archiveBackend_->setDeletePath();
|
||||
// Now it's safe to mark the archive backend for deletion
|
||||
archiveBackend_->setDeletePath();
|
||||
oldArchiveBackend = std::move(archiveBackend_);
|
||||
|
||||
// Complete the rotation
|
||||
archiveBackend_ = std::move(writableBackend_);
|
||||
writableBackend_ = std::move(newBackend);
|
||||
// Complete the rotation
|
||||
archiveBackend_ = std::move(writableBackend_);
|
||||
newArchiveBackendName = archiveBackend_->getName();
|
||||
|
||||
writableBackend_ = std::move(newBackend);
|
||||
}
|
||||
|
||||
f(newWritableBackendName, newArchiveBackendName);
|
||||
}
|
||||
|
||||
std::string
|
||||
|
||||
@@ -52,9 +52,11 @@ public:
|
||||
}
|
||||
|
||||
void
|
||||
rotateWithLock(
|
||||
std::function<std::unique_ptr<NodeStore::Backend>(
|
||||
std::string const& writableBackendName)> const& f) override;
|
||||
rotate(
|
||||
std::unique_ptr<NodeStore::Backend>&& newBackend,
|
||||
std::function<void(
|
||||
std::string const& writableName,
|
||||
std::string const& archiveName)> const& f) override;
|
||||
|
||||
std::string
|
||||
getName() const override;
|
||||
@@ -88,13 +90,7 @@ public:
|
||||
private:
|
||||
std::shared_ptr<Backend> writableBackend_;
|
||||
std::shared_ptr<Backend> archiveBackend_;
|
||||
// This needs to be a recursive mutex because callbacks in `rotateWithLock`
|
||||
// can call function that also lock the mutex. A current example of this is
|
||||
// a callback from SHAMapStoreImp, which calls `clearCaches`. This
|
||||
// `clearCaches` call eventually calls `fetchNodeObject` which tries to
|
||||
// relock the mutex. It would be desirable to rewrite the code so the lock
|
||||
// was not held during a callback.
|
||||
mutable std::recursive_mutex mutex_;
|
||||
mutable std::mutex mutex_;
|
||||
|
||||
std::shared_ptr<NodeObject>
|
||||
fetchNodeObject(
|
||||
|
||||
Reference in New Issue
Block a user