Files
rippled/src/libxrpl/nodestore/DatabaseRotatingImp.cpp

196 lines
4.7 KiB
C++

#include <xrpl/nodestore/detail/DatabaseRotatingImp.h>
namespace ripple {
namespace NodeStore {
DatabaseRotatingImp::DatabaseRotatingImp(
Scheduler& scheduler,
int readThreads,
std::shared_ptr<Backend> writableBackend,
std::shared_ptr<Backend> archiveBackend,
Section const& config,
beast::Journal j)
: DatabaseRotating(scheduler, readThreads, config, j)
, writableBackend_(std::move(writableBackend))
, archiveBackend_(std::move(archiveBackend))
{
if (writableBackend_)
fdRequired_ += writableBackend_->fdRequired();
if (archiveBackend_)
fdRequired_ += archiveBackend_->fdRequired();
}
void
DatabaseRotatingImp::rotate(
std::unique_ptr<NodeStore::Backend>&& newBackend,
std::function<void(
std::string const& writableName,
std::string const& archiveName)> const& f)
{
// Pass these two names to the callback function
std::string const newWritableBackendName = newBackend->getName();
std::string newArchiveBackendName;
// Hold on to current archive backend pointer until after the
// callback finishes. Only then will the archive directory be
// deleted.
std::shared_ptr<NodeStore::Backend> oldArchiveBackend;
{
std::lock_guard lock(mutex_);
archiveBackend_->setDeletePath();
oldArchiveBackend = std::move(archiveBackend_);
archiveBackend_ = std::move(writableBackend_);
newArchiveBackendName = archiveBackend_->getName();
writableBackend_ = std::move(newBackend);
}
f(newWritableBackendName, newArchiveBackendName);
}
std::string
DatabaseRotatingImp::getName() const
{
std::lock_guard lock(mutex_);
return writableBackend_->getName();
}
std::int32_t
DatabaseRotatingImp::getWriteLoad() const
{
std::lock_guard lock(mutex_);
return writableBackend_->getWriteLoad();
}
void
DatabaseRotatingImp::importDatabase(Database& source)
{
auto const backend = [&] {
std::lock_guard lock(mutex_);
return writableBackend_;
}();
importInternal(*backend, source);
}
void
DatabaseRotatingImp::sync()
{
std::lock_guard lock(mutex_);
writableBackend_->sync();
}
void
DatabaseRotatingImp::store(
NodeObjectType type,
Blob&& data,
uint256 const& hash,
std::uint32_t)
{
auto nObj = NodeObject::createObject(type, std::move(data), hash);
auto const backend = [&] {
std::lock_guard lock(mutex_);
return writableBackend_;
}();
backend->store(nObj);
storeStats(1, nObj->getData().size());
}
void
DatabaseRotatingImp::sweep()
{
// nothing to do
}
std::shared_ptr<NodeObject>
DatabaseRotatingImp::fetchNodeObject(
uint256 const& hash,
std::uint32_t,
FetchReport& fetchReport,
bool duplicate)
{
auto fetch = [&](std::shared_ptr<Backend> const& backend) {
Status status;
std::shared_ptr<NodeObject> nodeObject;
try
{
status = backend->fetch(hash.data(), &nodeObject);
}
catch (std::exception const& e)
{
JLOG(j_.fatal()) << "Exception, " << e.what();
Rethrow();
}
switch (status)
{
case ok:
case notFound:
break;
case dataCorrupt:
JLOG(j_.fatal()) << "Corrupt NodeObject #" << hash;
break;
default:
JLOG(j_.warn()) << "Unknown status=" << status;
break;
}
return nodeObject;
};
// See if the node object exists in the cache
std::shared_ptr<NodeObject> nodeObject;
auto [writable, archive] = [&] {
std::lock_guard lock(mutex_);
return std::make_pair(writableBackend_, archiveBackend_);
}();
// Try to fetch from the writable backend
nodeObject = fetch(writable);
if (!nodeObject)
{
// Otherwise try to fetch from the archive backend
nodeObject = fetch(archive);
if (nodeObject)
{
{
// Refresh the writable backend pointer
std::lock_guard lock(mutex_);
writable = writableBackend_;
}
// Update writable backend with data from the archive backend
if (duplicate)
writable->store(nodeObject);
}
}
if (nodeObject)
fetchReport.wasFound = true;
return nodeObject;
}
void
DatabaseRotatingImp::for_each(
std::function<void(std::shared_ptr<NodeObject>)> f)
{
auto [writable, archive] = [&] {
std::lock_guard lock(mutex_);
return std::make_pair(writableBackend_, archiveBackend_);
}();
// Iterate the writable backend
writable->for_each(f);
// Iterate the archive backend
archive->for_each(f);
}
} // namespace NodeStore
} // namespace ripple