#include namespace xrpl { namespace NodeStore { DatabaseRotatingImp::DatabaseRotatingImp( Scheduler& scheduler, int readThreads, std::shared_ptr writableBackend, std::shared_ptr archiveBackend, Section const& config, beast::Journal j) : DatabaseRotating(scheduler, readThreads, config, j) , writableBackend_(std::move(writableBackend)) , archiveBackend_(std::move(archiveBackend)) { if (writableBackend_) fdRequired_ += writableBackend_->fdRequired(); if (archiveBackend_) fdRequired_ += archiveBackend_->fdRequired(); } void DatabaseRotatingImp::rotate( std::unique_ptr&& newBackend, std::function const& f) { // Pass these two names to the callback function std::string const newWritableBackendName = newBackend->getName(); std::string newArchiveBackendName; // Hold on to current archive backend pointer until after the // callback finishes. Only then will the archive directory be // deleted. std::shared_ptr oldArchiveBackend; { std::lock_guard lock(mutex_); archiveBackend_->setDeletePath(); oldArchiveBackend = std::move(archiveBackend_); archiveBackend_ = std::move(writableBackend_); newArchiveBackendName = archiveBackend_->getName(); writableBackend_ = std::move(newBackend); } f(newWritableBackendName, newArchiveBackendName); } std::string DatabaseRotatingImp::getName() const { std::lock_guard lock(mutex_); return writableBackend_->getName(); } std::int32_t DatabaseRotatingImp::getWriteLoad() const { std::lock_guard lock(mutex_); return writableBackend_->getWriteLoad(); } void DatabaseRotatingImp::importDatabase(Database& source) { auto const backend = [&] { std::lock_guard lock(mutex_); return writableBackend_; }(); importInternal(*backend, source); } void DatabaseRotatingImp::sync() { std::lock_guard lock(mutex_); writableBackend_->sync(); } void DatabaseRotatingImp::store(NodeObjectType type, Blob&& data, uint256 const& hash, std::uint32_t) { auto nObj = NodeObject::createObject(type, std::move(data), hash); auto const backend = [&] { std::lock_guard lock(mutex_); return writableBackend_; }(); backend->store(nObj); storeStats(1, nObj->getData().size()); } std::shared_ptr DatabaseRotatingImp::fetchNodeObject(uint256 const& hash, std::uint32_t, FetchReport& fetchReport, bool duplicate) { auto fetch = [&](std::shared_ptr const& backend) { Status status; std::shared_ptr nodeObject; try { status = backend->fetch(hash.data(), &nodeObject); } catch (std::exception const& e) { JLOG(j_.fatal()) << "Exception, " << e.what(); Rethrow(); } switch (status) { case ok: case notFound: break; case dataCorrupt: JLOG(j_.fatal()) << "Corrupt NodeObject #" << hash; break; default: JLOG(j_.warn()) << "Unknown status=" << status; break; } return nodeObject; }; // See if the node object exists in the cache std::shared_ptr nodeObject; auto [writable, archive] = [&] { std::lock_guard lock(mutex_); return std::make_pair(writableBackend_, archiveBackend_); }(); // Try to fetch from the writable backend nodeObject = fetch(writable); if (!nodeObject) { // Otherwise try to fetch from the archive backend nodeObject = fetch(archive); if (nodeObject) { { // Refresh the writable backend pointer std::lock_guard lock(mutex_); writable = writableBackend_; } // Update writable backend with data from the archive backend if (duplicate) writable->store(nodeObject); } } if (nodeObject) fetchReport.wasFound = true; return nodeObject; } void DatabaseRotatingImp::for_each(std::function)> f) { auto [writable, archive] = [&] { std::lock_guard lock(mutex_); return std::make_pair(writableBackend_, archiveBackend_); }(); // Iterate the writable backend writable->for_each(f); // Iterate the archive backend archive->for_each(f); } } // namespace NodeStore } // namespace xrpl