diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 1b51d62292..b8fd3dac4a 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -591,6 +591,7 @@ target_sources (rippled PRIVATE src/ripple/rpc/handlers/LogLevel.cpp src/ripple/rpc/handlers/LogRotate.cpp src/ripple/rpc/handlers/Manifest.cpp + src/ripple/rpc/handlers/NodeToShardStatus.cpp src/ripple/rpc/handlers/NoRippleCheck.cpp src/ripple/rpc/handlers/OwnerInfo.cpp src/ripple/rpc/handlers/PathFind.cpp diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 3f80c2ae01..674811ce66 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -916,7 +916,7 @@ public: using namespace std::chrono; auto const start = steady_clock::now(); - m_nodeStore->import(*source); + m_nodeStore->importDatabase(*source); auto const elapsed = duration_cast(steady_clock::now() - start); @@ -2109,7 +2109,7 @@ ApplicationImp::nodeToShards() JLOG(m_journal.fatal()) << "Invalid [shard_db] configuration"; return false; } - shardStore_->import(getNodeStore()); + shardStore_->importDatabase(getNodeStore()); return true; } diff --git a/src/ripple/app/main/Main.cpp b/src/ripple/app/main/Main.cpp index c36294d89d..7f1ce1f41d 100644 --- a/src/ripple/app/main/Main.cpp +++ b/src/ripple/app/main/Main.cpp @@ -156,7 +156,8 @@ printHelp(const po::options_description& desc) " ledger_current\n" " ledger_request \n" " log_level [[] ]\n" - " logrotate \n" + " logrotate\n" + " nodetoshard_status\n" " peers\n" " ping\n" " random\n" diff --git a/src/ripple/app/misc/SHAMapStoreImp.cpp b/src/ripple/app/misc/SHAMapStoreImp.cpp index 4daa5f66d6..a3e36fb085 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.cpp +++ b/src/ripple/app/misc/SHAMapStoreImp.cpp @@ -301,9 +301,31 @@ SHAMapStoreImp::run() state_db_.setLastRotated(lastRotated); } + bool const readyToRotate = + validatedSeq >= lastRotated + deleteInterval_ && + canDelete_ >= lastRotated - 1 && !health(); + + // Make sure we don't delete ledgers currently being + // imported into the ShardStore + bool const waitForImport = readyToRotate && [this, lastRotated] { + if (auto shardStore = app_.getShardStore()) + { + if (auto sequence = shardStore->getDatabaseImportSequence()) + return sequence <= lastRotated - 1; + } + + return false; + }(); + + if (waitForImport) + { + JLOG(journal_.info()) + << "NOT rotating validatedSeq " << validatedSeq + << " as rotation would interfere with ShardStore import"; + } + // will delete up to (not including) lastRotated - if (validatedSeq >= lastRotated + deleteInterval_ && - canDelete_ >= lastRotated - 1 && !health()) + if (readyToRotate && !waitForImport) { JLOG(journal_.warn()) << "rotating validatedSeq " << validatedSeq << " lastRotated " diff --git a/src/ripple/net/impl/RPCCall.cpp b/src/ripple/net/impl/RPCCall.cpp index 24fbda8762..d688f79bb6 100644 --- a/src/ripple/net/impl/RPCCall.cpp +++ b/src/ripple/net/impl/RPCCall.cpp @@ -1257,6 +1257,7 @@ public: {"log_level", &RPCParser::parseLogLevel, 0, 2}, {"logrotate", &RPCParser::parseAsIs, 0, 0}, {"manifest", &RPCParser::parseManifest, 1, 1}, + {"nodetoshard_status", &RPCParser::parseAsIs, 0, 0}, {"owner_info", &RPCParser::parseAccountItems, 1, 3}, {"peers", &RPCParser::parseAsIs, 0, 0}, {"ping", &RPCParser::parseAsIs, 0, 0}, diff --git a/src/ripple/nodestore/Database.h b/src/ripple/nodestore/Database.h index 861378551e..c9c00f9450 100644 --- a/src/ripple/nodestore/Database.h +++ b/src/ripple/nodestore/Database.h @@ -82,7 +82,7 @@ public: /** Import objects from another database. */ virtual void - import(Database& source) = 0; + importDatabase(Database& source) = 0; /** Retrieve the estimated number of pending write operations. This is used for diagnostics. diff --git a/src/ripple/nodestore/DatabaseShard.h b/src/ripple/nodestore/DatabaseShard.h index 1c71f073c8..a43292aa8a 100644 --- a/src/ripple/nodestore/DatabaseShard.h +++ b/src/ripple/nodestore/DatabaseShard.h @@ -97,7 +97,9 @@ public: virtual std::string getPreShards() = 0; - /** Import a shard into the shard database + /** Import a shard from the shard archive handler into the + shard database. This differs from 'importDatabase' which + imports the contents of the NodeStore @param shardIndex Shard index to import @param srcDir The directory to import from @@ -268,6 +270,17 @@ public: virtual boost::filesystem::path const& getRootDir() const = 0; + virtual Json::Value + getDatabaseImportStatus() const = 0; + + /** Returns the first ledger sequence of the shard currently being imported + from the NodeStore + + @return The ledger sequence or an unseated value if no import is running + */ + virtual std::optional + getDatabaseImportSequence() const = 0; + /** The number of ledgers in a shard */ static constexpr std::uint32_t ledgersPerShardDefault{16384u}; }; diff --git a/src/ripple/nodestore/impl/DatabaseNodeImp.h b/src/ripple/nodestore/impl/DatabaseNodeImp.h index c479082b07..ae0a99fdb7 100644 --- a/src/ripple/nodestore/impl/DatabaseNodeImp.h +++ b/src/ripple/nodestore/impl/DatabaseNodeImp.h @@ -97,7 +97,7 @@ public: } void - import(Database& source) override + importDatabase(Database& source) override { importInternal(*backend_.get(), source); } diff --git a/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp b/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp index 24ddb61859..56ec0a35a8 100644 --- a/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp @@ -69,7 +69,7 @@ DatabaseRotatingImp::getWriteLoad() const } void -DatabaseRotatingImp::import(Database& source) +DatabaseRotatingImp::importDatabase(Database& source) { auto const backend = [&] { std::lock_guard lock(mutex_); diff --git a/src/ripple/nodestore/impl/DatabaseRotatingImp.h b/src/ripple/nodestore/impl/DatabaseRotatingImp.h index 985344102f..dc095adcc8 100644 --- a/src/ripple/nodestore/impl/DatabaseRotatingImp.h +++ b/src/ripple/nodestore/impl/DatabaseRotatingImp.h @@ -58,7 +58,7 @@ public: getWriteLoad() const override; void - import(Database& source) override; + importDatabase(Database& source) override; bool isSameDB(std::uint32_t, std::uint32_t) override { diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 60bc7361df..fa27a0887e 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -151,12 +151,12 @@ DatabaseShardImp::init() continue; } - // Check if a previous import failed - if (is_regular_file(shardDir / importMarker_)) + // Check if a previous database import failed + if (is_regular_file(shardDir / databaseImportMarker_)) { JLOG(j_.warn()) << "shard " << shardIndex - << " previously failed import, removing"; + << " previously failed database import, removing"; remove_all(shardDir); continue; } @@ -358,7 +358,19 @@ DatabaseShardImp::prepareShards(std::vector const& shardIndexes) return fail("is already stored", shardIndex); if (preparedIndexes_.find(shardIndex) != preparedIndexes_.end()) - return fail("is already queued for import", shardIndex); + return fail( + "is already queued for import from the shard archive handler", + shardIndex); + + if (databaseImportStatus_) + { + if (auto shard = databaseImportStatus_->currentShard.lock(); shard) + { + if (shard->index() == shardIndex) + return fail( + "is being imported from the nodestore", shardIndex); + } + } // Any shard earlier than the two most recent shards // is a historical shard @@ -710,102 +722,151 @@ DatabaseShardImp::stop() JLOG(j_.warn()) << " shard " << shard->index() << " unexpired"; } } + + // Notify the shard being imported + // from the node store to stop + if (databaseImportStatus_) + { + // A node store import is in progress + if (auto importShard = databaseImportStatus_->currentShard.lock(); + importShard) + importShard->stop(); + } + + // Wait for the node store import thread + // if necessary + if (databaseImporter_.joinable()) + databaseImporter_.join(); } void -DatabaseShardImp::import(Database& source) +DatabaseShardImp::importDatabase(Database& source) { + std::lock_guard lock(mutex_); + assert(init_); + + // Only the application local node store can be imported + assert(&source == &app_.getNodeStore()); + + if (databaseImporter_.joinable()) + { + assert(false); + JLOG(j_.error()) << "database import already in progress"; + return; + } + + // Run the lengthy node store import process in the background + // on a dedicated thread. + databaseImporter_ = std::thread([this] { doImportDatabase(); }); +} + +void +DatabaseShardImp::doImportDatabase() +{ + if (isStopping()) + return; + + auto loadLedger = + [this](char const* const sortOrder) -> std::optional { + std::shared_ptr ledger; + std::uint32_t ledgerSeq{0}; + std::optional info; + if (sortOrder == std::string("asc")) + { + info = dynamic_cast( + &app_.getRelationalDBInterface()) + ->getLimitedOldestLedgerInfo(earliestLedgerSeq()); + } + else + { + info = dynamic_cast( + &app_.getRelationalDBInterface()) + ->getLimitedNewestLedgerInfo(earliestLedgerSeq()); + } + if (info) + { + ledger = loadLedgerHelper(*info, app_, false); + ledgerSeq = info->seq; + } + if (!ledger || ledgerSeq == 0) + { + JLOG(j_.error()) << "no suitable ledgers were found in" + " the SQLite database to import"; + return std::nullopt; + } + return ledgerSeq; + }; + + // Find earliest ledger sequence stored + auto const earliestLedgerSeq{loadLedger("asc")}; + if (!earliestLedgerSeq) + return; + + auto const earliestIndex = [&] { + auto earliestIndex = seqToShardIndex(*earliestLedgerSeq); + + // Consider only complete shards + if (earliestLedgerSeq != firstLedgerSeq(earliestIndex)) + ++earliestIndex; + + return earliestIndex; + }(); + + // Find last ledger sequence stored + auto const latestLedgerSeq = loadLedger("desc"); + if (!latestLedgerSeq) + return; + + auto const latestIndex = [&] { + auto latestIndex = seqToShardIndex(*latestLedgerSeq); + + // Consider only complete shards + if (latestLedgerSeq != lastLedgerSeq(latestIndex)) + --latestIndex; + + return latestIndex; + }(); + + if (latestIndex < earliestIndex) + { + JLOG(j_.error()) << "no suitable ledgers were found in" + " the SQLite database to import"; + return; + } + + JLOG(j_.debug()) << "Importing ledgers for shards " << earliestIndex + << " through " << latestIndex; + { std::lock_guard lock(mutex_); - assert(init_); - // Only the application local node store can be imported - if (&source != &app_.getNodeStore()) - { - assert(false); - JLOG(j_.error()) << "invalid source database"; + assert(!databaseImportStatus_); + databaseImportStatus_ = std::make_unique( + earliestIndex, latestIndex, 0); + } + + // Import the shards + for (std::uint32_t shardIndex = earliestIndex; shardIndex <= latestIndex; + ++shardIndex) + { + if (isStopping()) return; - } - std::uint32_t earliestIndex; - std::uint32_t latestIndex; - { - auto loadLedger = [&](bool ascendSort = - true) -> std::optional { - std::shared_ptr ledger; - std::uint32_t ledgerSeq{0}; - std::optional info; - if (ascendSort) - { - info = - dynamic_cast( - &app_.getRelationalDBInterface()) - ->getLimitedOldestLedgerInfo(earliestLedgerSeq()); - } - else - { - info = - dynamic_cast( - &app_.getRelationalDBInterface()) - ->getLimitedNewestLedgerInfo(earliestLedgerSeq()); - } - if (info) - { - ledger = loadLedgerHelper(*info, app_, false); - ledgerSeq = info->seq; - } - if (!ledger || ledgerSeq == 0) - { - JLOG(j_.error()) << "no suitable ledgers were found in" - " the SQLite database to import"; - return std::nullopt; - } - return ledgerSeq; - }; + auto const pathDesignation = [this, shardIndex] { + std::lock_guard lock(mutex_); - // Find earliest ledger sequence stored - auto ledgerSeq{loadLedger()}; - if (!ledgerSeq) - return; - earliestIndex = seqToShardIndex(*ledgerSeq); - - // Consider only complete shards - if (ledgerSeq != firstLedgerSeq(earliestIndex)) - ++earliestIndex; - - // Find last ledger sequence stored - ledgerSeq = loadLedger(false); - if (!ledgerSeq) - return; - latestIndex = seqToShardIndex(*ledgerSeq); - - // Consider only complete shards - if (ledgerSeq != lastLedgerSeq(latestIndex)) - --latestIndex; - - if (latestIndex < earliestIndex) - { - JLOG(j_.error()) << "no suitable ledgers were found in" - " the SQLite database to import"; - return; - } - } - - auto numHistShards = this->numHistoricalShards(lock); - - // Import the shards - for (std::uint32_t shardIndex = earliestIndex; - shardIndex <= latestIndex; - ++shardIndex) - { + auto const numHistShards = numHistoricalShards(lock); auto const pathDesignation = prepareForNewShard(shardIndex, numHistShards, lock); - if (!pathDesignation) - break; + return pathDesignation; + }(); - auto const needsHistoricalPath = - *pathDesignation == PathDesignation::historical; + if (!pathDesignation) + break; + + { + std::lock_guard lock(mutex_); // Skip if being acquired if (shardIndex == acquireIndex_) @@ -815,7 +876,7 @@ DatabaseShardImp::import(Database& source) continue; } - // Skip if being imported + // Skip if being imported from the shard archive handler if (preparedIndexes_.find(shardIndex) != preparedIndexes_.end()) { JLOG(j_.debug()) @@ -829,134 +890,174 @@ DatabaseShardImp::import(Database& source) JLOG(j_.debug()) << "shard " << shardIndex << " already stored"; continue; } + } - // Verify SQLite ledgers are in the node store - { - auto const firstSeq{firstLedgerSeq(shardIndex)}; - auto const lastSeq{ - std::max(firstSeq, lastLedgerSeq(shardIndex))}; - auto const numLedgers{ - shardIndex == earliestShardIndex() ? lastSeq - firstSeq + 1 - : ledgersPerShard_}; - auto ledgerHashes{ - app_.getRelationalDBInterface().getHashesByIndex( - firstSeq, lastSeq)}; - if (ledgerHashes.size() != numLedgers) - continue; + std::uint32_t const firstSeq = firstLedgerSeq(shardIndex); + std::uint32_t const lastSeq = + std::max(firstSeq, lastLedgerSeq(shardIndex)); + std::uint32_t const numLedgers = shardIndex == earliestShardIndex() + ? lastSeq - firstSeq + 1 + : ledgersPerShard_; - bool valid{true}; - for (std::uint32_t n = firstSeq; n <= lastSeq; n += 256) - { - if (!source.fetchNodeObject(ledgerHashes[n].ledgerHash, n)) - { - JLOG(j_.warn()) << "SQLite ledger sequence " << n - << " mismatches node store"; - valid = false; - break; - } - } - if (!valid) - continue; - } - - auto const path = - needsHistoricalPath ? chooseHistoricalPath(lock) : dir_; - - // Create the new shard - auto shard{ - std::make_unique(app_, *this, shardIndex, path, j_)}; - if (!shard->init(scheduler_, *ctx_)) + // Verify SQLite ledgers are in the node store + { + auto const ledgerHashes{ + app_.getRelationalDBInterface().getHashesByIndex( + firstSeq, lastSeq)}; + if (ledgerHashes.size() != numLedgers) continue; - // Create a marker file to signify an import in progress - auto const shardDir{path / std::to_string(shardIndex)}; - auto const markerFile{shardDir / importMarker_}; + auto& source = app_.getNodeStore(); + bool valid{true}; + + for (std::uint32_t n = firstSeq; n <= lastSeq; ++n) { - std::ofstream ofs{markerFile.string()}; - if (!ofs.is_open()) + if (!source.fetchNodeObject(ledgerHashes.at(n).ledgerHash, n)) { - JLOG(j_.error()) << "shard " << shardIndex - << " failed to create temp marker file"; - shard->removeOnDestroy(); - continue; - } - ofs.close(); - } - - // Copy the ledgers from node store - std::shared_ptr recentStored; - std::optional lastLedgerHash; - - while (auto const ledgerSeq = shard->prepare()) - { - auto ledger{loadByIndex(*ledgerSeq, app_, false)}; - if (!ledger || ledger->info().seq != ledgerSeq) + JLOG(j_.warn()) << "SQLite ledger sequence " << n + << " mismatches node store"; + valid = false; break; - - auto const result{shard->storeLedger(ledger, recentStored)}; - storeStats(result.count, result.size); - if (result.error) - break; - - if (!shard->setLedgerStored(ledger)) - break; - - if (!lastLedgerHash && ledgerSeq == lastLedgerSeq(shardIndex)) - lastLedgerHash = ledger->info().hash; - - recentStored = std::move(ledger); - } - - using namespace boost::filesystem; - bool success{false}; - if (lastLedgerHash && shard->getState() == Shard::complete) - { - // Store shard final key - Serializer s; - s.add32(Shard::version); - s.add32(firstLedgerSeq(shardIndex)); - s.add32(lastLedgerSeq(shardIndex)); - s.addBitString(*lastLedgerHash); - auto const nodeObject{NodeObject::createObject( - hotUNKNOWN, std::move(s.modData()), Shard::finalKey)}; - - if (shard->storeNodeObject(nodeObject)) - { - try - { - // The import process is complete and the - // marker file is no longer required - remove_all(markerFile); - - JLOG(j_.debug()) << "shard " << shardIndex - << " was successfully imported"; - finalizeShard( - shards_.emplace(shardIndex, std::move(shard)) - .first->second, - true, - std::nullopt); - success = true; - - if (shardIndex < shardBoundaryIndex()) - ++numHistShards; - } - catch (std::exception const& e) - { - JLOG(j_.fatal()) << "shard index " << shardIndex - << ". Exception caught in function " - << __func__ << ". Error: " << e.what(); - } } } + if (!valid) + continue; + } - if (!success) + if (isStopping()) + return; + + bool const needsHistoricalPath = + *pathDesignation == PathDesignation::historical; + + auto const path = needsHistoricalPath + ? chooseHistoricalPath(std::lock_guard(mutex_)) + : dir_; + + // Create the new shard + auto shard{std::make_shared(app_, *this, shardIndex, path, j_)}; + if (!shard->init(scheduler_, *ctx_)) + continue; + + { + std::lock_guard lock(mutex_); + + if (isStopping()) + return; + + databaseImportStatus_->currentIndex = shardIndex; + databaseImportStatus_->currentShard = shard; + databaseImportStatus_->firstSeq = firstSeq; + databaseImportStatus_->lastSeq = lastSeq; + } + + // Create a marker file to signify a database import in progress + auto const shardDir{path / std::to_string(shardIndex)}; + auto const markerFile{shardDir / databaseImportMarker_}; + { + std::ofstream ofs{markerFile.string()}; + if (!ofs.is_open()) { - JLOG(j_.error()) - << "shard " << shardIndex << " failed to import"; + JLOG(j_.error()) << "shard " << shardIndex + << " failed to create temp marker file"; shard->removeOnDestroy(); + continue; } } + // Copy the ledgers from node store + std::shared_ptr recentStored; + std::optional lastLedgerHash; + + while (auto const ledgerSeq = shard->prepare()) + { + if (isStopping()) + return; + + auto const ledger{loadByIndex(*ledgerSeq, app_, false)}; + if (!ledger || ledger->info().seq != ledgerSeq) + break; + + auto const result{shard->storeLedger(ledger, recentStored)}; + storeStats(result.count, result.size); + if (result.error) + break; + + if (!shard->setLedgerStored(ledger)) + break; + + if (!lastLedgerHash && ledgerSeq == lastSeq) + lastLedgerHash = ledger->info().hash; + + recentStored = std::move(ledger); + } + + if (isStopping()) + return; + + using namespace boost::filesystem; + bool success{false}; + if (lastLedgerHash && shard->getState() == Shard::complete) + { + // Store shard final key + Serializer s; + s.add32(Shard::version); + s.add32(firstLedgerSeq(shardIndex)); + s.add32(lastLedgerSeq(shardIndex)); + s.addBitString(*lastLedgerHash); + auto const nodeObject{NodeObject::createObject( + hotUNKNOWN, std::move(s.modData()), Shard::finalKey)}; + + if (shard->storeNodeObject(nodeObject)) + { + try + { + std::lock_guard lock(mutex_); + + // The database import process is complete and the + // marker file is no longer required + remove_all(markerFile); + + JLOG(j_.debug()) << "shard " << shardIndex + << " was successfully imported" + " from the NodeStore"; + finalizeShard( + shards_.emplace(shardIndex, std::move(shard)) + .first->second, + true, + std::nullopt); + + // This variable is meant to capture the success + // of everything up to the point of shard finalization. + // If the shard fails to finalize, this condition will + // be handled by the finalization function itself, and + // not here. + success = true; + } + catch (std::exception const& e) + { + JLOG(j_.fatal()) << "shard index " << shardIndex + << ". Exception caught in function " + << __func__ << ". Error: " << e.what(); + } + } + } + + if (!success) + { + JLOG(j_.error()) << "shard " << shardIndex + << " failed to import from the NodeStore"; + shard->removeOnDestroy(); + } + } + + { + std::lock_guard lock(mutex_); + + if (isStopping()) + return; + + databaseImportStatus_.reset(); updateStatus(lock); } @@ -1103,6 +1204,43 @@ DatabaseShardImp::sweep() } } +Json::Value +DatabaseShardImp::getDatabaseImportStatus() const +{ + Json::Value ret(Json::objectValue); + + if (std::lock_guard lock(mutex_); databaseImportStatus_) + { + ret[jss::firstShardIndex] = databaseImportStatus_->earliestIndex; + ret[jss::lastShardIndex] = databaseImportStatus_->latestIndex; + ret[jss::currentShardIndex] = databaseImportStatus_->currentIndex; + + Json::Value currentShard(Json::objectValue); + currentShard[jss::firstSequence] = databaseImportStatus_->firstSeq; + currentShard[jss::lastSequence] = databaseImportStatus_->lastSeq; + + if (auto shard = databaseImportStatus_->currentShard.lock(); shard) + currentShard[jss::storedSeqs] = shard->getStoredSeqs(); + + ret[jss::currentShard] = currentShard; + } + else + ret = "Database import not running"; + + return ret; +} + +std::optional +DatabaseShardImp::getDatabaseImportSequence() const +{ + std::lock_guard lock(mutex_); + + if (!databaseImportStatus_) + return {}; + + return databaseImportStatus_->firstSeq; +} + bool DatabaseShardImp::initConfig(std::lock_guard const&) { @@ -1279,7 +1417,7 @@ DatabaseShardImp::findAcquireIndex( void DatabaseShardImp::finalizeShard( std::shared_ptr& shard, - bool writeSQLite, + bool const writeSQLite, std::optional const& expectedHash) { taskQueue_.addTask([this, @@ -1738,11 +1876,11 @@ DatabaseShardImp::relocateOutdatedShards( } } -std::optional +auto DatabaseShardImp::prepareForNewShard( std::uint32_t shardIndex, std::uint32_t numHistoricalShards, - std::lock_guard const& lock) + std::lock_guard const& lock) -> std::optional { // Any shard earlier than the two most recent shards is a historical shard auto const boundaryIndex{shardBoundaryIndex()}; diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index e270229111..1f22ba5829 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -134,7 +134,10 @@ public: @param source The application node store. */ void - import(Database& source) override; + importDatabase(Database& source) override; + + void + doImportDatabase(); std::int32_t getWriteLoad() const override; @@ -161,6 +164,12 @@ public: void sweep() override; + Json::Value + getDatabaseImportStatus() const override; + + std::optional + getDatabaseImportSequence() const override; + bool callForLedgerSQL( LedgerIndex ledgerSeq, @@ -203,6 +212,37 @@ private: historical // Needs a historical path }; + struct DatabaseImportStatus + { + DatabaseImportStatus( + std::uint32_t const earliestIndex, + std::uint32_t const latestIndex, + std::uint32_t const currentIndex) + : earliestIndex(earliestIndex) + , latestIndex(latestIndex) + , currentIndex(currentIndex) + { + } + + // Index of the first shard to be imported + std::uint32_t earliestIndex{0}; + + // Index of the last shard to be imported + std::uint32_t latestIndex{0}; + + // Index of the shard currently being imported + std::uint32_t currentIndex{0}; + + // First ledger sequence of the current shard + std::uint32_t firstSeq{0}; + + // Last ledger sequence of the current shard + std::uint32_t lastSeq{0}; + + // The shard currently being imported + std::weak_ptr currentShard; + }; + Application& app_; mutable std::mutex mutex_; bool init_{false}; @@ -216,7 +256,7 @@ private: // Shards held by this server std::map> shards_; - // Shard indexes being imported + // Shard indexes being imported from the shard archive handler std::set preparedIndexes_; // Shard index being acquired from the peer network @@ -258,7 +298,7 @@ private: std::uint32_t const openFinalLimit_; // File name used to mark shards being imported from node store - static constexpr auto importMarker_ = "import"; + static constexpr auto databaseImportMarker_ = "database_import"; // latestShardIndex_ and secondLatestShardIndex hold the indexes // of the shards most recently confirmed by the network. These @@ -270,6 +310,12 @@ private: std::optional latestShardIndex_; std::optional secondLatestShardIndex_; + // Struct used for node store import progress + std::unique_ptr databaseImportStatus_; + + // Thread for running node store import + std::thread databaseImporter_; + // Initialize settings from the configuration file // Lock must be held bool @@ -284,7 +330,7 @@ private: void for_each(std::function)> f) override { - Throw("Shard store import not supported"); + Throw("Import from shard store not supported"); } // Randomly select a shard index not stored diff --git a/src/ripple/nodestore/impl/Shard.h b/src/ripple/nodestore/impl/Shard.h index d21d75266b..877b21cb0c 100644 --- a/src/ripple/nodestore/impl/Shard.h +++ b/src/ripple/nodestore/impl/Shard.h @@ -197,6 +197,15 @@ public: removeOnDestroy_ = true; } + std::string + getStoredSeqs() + { + if (!acquireInfo_) + return ""; + + return to_string(acquireInfo_->storedSeqs); + } + /** * @brief callForLedgerSQL Checks out ledger database for the shard and * calls given callback function passing shard index and session diff --git a/src/ripple/protocol/jss.h b/src/ripple/protocol/jss.h index a67e2628a3..13942b5b4d 100644 --- a/src/ripple/protocol/jss.h +++ b/src/ripple/protocol/jss.h @@ -179,6 +179,8 @@ JSS(converge_time); // out: NetworkOPs JSS(converge_time_s); // out: NetworkOPs JSS(count); // in: AccountTx*, ValidatorList JSS(counters); // in/out: retrieve counters +JSS(currentShard); // out: NodeToShardStatus +JSS(currentShardIndex); // out: NodeToShardStatus JSS(currency); // in: paths/PathRequest, STAmount // out: STPathSet, STAmount, // AccountLines @@ -245,6 +247,8 @@ JSS(fee_mult_max); // in: TransactionSign JSS(fee_ref); // out: NetworkOPs JSS(fetch_pack); // out: NetworkOPs JSS(first); // out: rpc/Version +JSS(firstSequence); // out: NodeToShardStatus +JSS(firstShardIndex); // out: NodeToShardStatus JSS(finished); JSS(fix_txns); // in: LedgerCleaner JSS(flags); // out: AccountOffers, @@ -295,6 +299,8 @@ JSS(key); // out JSS(key_type); // in/out: WalletPropose, TransactionSign JSS(latency); // out: PeerImp JSS(last); // out: RPCVersion +JSS(lastSequence); // out: NodeToShardStatus +JSS(lastShardIndex); // out: NodeToShardStatus JSS(last_close); // out: NetworkOPs JSS(last_refresh_time); // out: ValidatorSite JSS(last_refresh_status); // out: ValidatorSite @@ -513,6 +519,7 @@ JSS(state_accounting); // out: NetworkOPs JSS(state_now); // in: Subscribe JSS(status); // error JSS(stop); // in: LedgerCleaner +JSS(storedSeqs); // out: NodeToShardStatus JSS(streams); // in: Subscribe, Unsubscribe JSS(strict); // in: AccountCurrencies, AccountInfo JSS(sub_index); // in: LedgerEntry diff --git a/src/ripple/rpc/handlers/Handlers.h b/src/ripple/rpc/handlers/Handlers.h index 915d01ce92..ec8a0ff355 100644 --- a/src/ripple/rpc/handlers/Handlers.h +++ b/src/ripple/rpc/handlers/Handlers.h @@ -89,6 +89,8 @@ doLogRotate(RPC::JsonContext&); Json::Value doManifest(RPC::JsonContext&); Json::Value +doNodeToShardStatus(RPC::JsonContext&); +Json::Value doNoRippleCheck(RPC::JsonContext&); Json::Value doOwnerInfo(RPC::JsonContext&); diff --git a/src/ripple/rpc/handlers/NodeToShardStatus.cpp b/src/ripple/rpc/handlers/NodeToShardStatus.cpp new file mode 100644 index 0000000000..f90f4d5730 --- /dev/null +++ b/src/ripple/rpc/handlers/NodeToShardStatus.cpp @@ -0,0 +1,43 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2021 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include +#include +#include + +namespace ripple { + +Json::Value +doNodeToShardStatus(RPC::JsonContext& context) +{ + Json::Value ret(Json::objectValue); + + if (auto const shardStore = context.app.getShardStore()) + ret[jss::info] = shardStore->getDatabaseImportStatus(); + else + ret = RPC::make_error(rpcINTERNAL, "No shard store"); + + return ret; +} + +} // namespace ripple diff --git a/src/ripple/rpc/impl/Handler.cpp b/src/ripple/rpc/impl/Handler.cpp index dd7b404a53..87515d9f16 100644 --- a/src/ripple/rpc/impl/Handler.cpp +++ b/src/ripple/rpc/impl/Handler.cpp @@ -107,6 +107,10 @@ Handler const handlerArray[]{ {"log_level", byRef(&doLogLevel), Role::ADMIN, NO_CONDITION}, {"logrotate", byRef(&doLogRotate), Role::ADMIN, NO_CONDITION}, {"manifest", byRef(&doManifest), Role::USER, NO_CONDITION}, + {"nodetoshard_status", + byRef(&doNodeToShardStatus), + Role::USER, + NO_CONDITION}, {"noripple_check", byRef(&doNoRippleCheck), Role::USER, NO_CONDITION}, {"owner_info", byRef(&doOwnerInfo), Role::USER, NEEDS_CURRENT_LEDGER}, {"peers", byRef(&doPeers), Role::ADMIN, NO_CONDITION}, diff --git a/src/test/nodestore/DatabaseShard_test.cpp b/src/test/nodestore/DatabaseShard_test.cpp index 824e1ae264..935bd8daf7 100644 --- a/src/test/nodestore/DatabaseShard_test.cpp +++ b/src/test/nodestore/DatabaseShard_test.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -34,6 +35,7 @@ #include #include #include +#include #include namespace ripple { @@ -1043,53 +1045,6 @@ class DatabaseShard_test : public TestBase } } - void - testImport(std::uint64_t const seedValue) - { - testcase("Import node store"); - - using namespace test::jtx; - - beast::temp_dir shardDir; - { - beast::temp_dir nodeDir; - Env env{*this, testConfig(shardDir.path(), nodeDir.path())}; - DatabaseShard* db = env.app().getShardStore(); - Database& ndb = env.app().getNodeStore(); - BEAST_EXPECT(db); - - TestData data(seedValue, 4, 2); - if (!BEAST_EXPECT(data.makeLedgers(env))) - return; - - for (std::uint32_t i = 0; i < 2 * ledgersPerShard; ++i) - BEAST_EXPECT(saveLedger(ndb, *data.ledgers_[i])); - - BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0)); - db->import(ndb); - for (std::uint32_t i = 1; i <= 2; ++i) - waitShard(*db, i); - BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0x6)); - } - { - Env env{*this, testConfig(shardDir.path())}; - DatabaseShard* db = env.app().getShardStore(); - BEAST_EXPECT(db); - - TestData data(seedValue, 4, 2); - if (!BEAST_EXPECT(data.makeLedgers(env))) - return; - - for (std::uint32_t i = 1; i <= 2; ++i) - waitShard(*db, i); - - BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0x6)); - - for (std::uint32_t i = 0; i < 2 * ledgersPerShard; ++i) - checkLedger(data, *db, *data.ledgers_[i]); - } - } - std::string ripemd160File(std::string filename) { @@ -1169,6 +1124,137 @@ class DatabaseShard_test : public TestBase } } + void + testImport(std::uint64_t const seedValue) + { + testcase("Import node store"); + + using namespace test::jtx; + + beast::temp_dir shardDir; + { + beast::temp_dir nodeDir; + Env env{*this, testConfig(shardDir.path(), nodeDir.path())}; + DatabaseShard* db = env.app().getShardStore(); + Database& ndb = env.app().getNodeStore(); + BEAST_EXPECT(db); + + TestData data(seedValue, 4, 2); + if (!BEAST_EXPECT(data.makeLedgers(env))) + return; + + for (std::uint32_t i = 0; i < 2 * ledgersPerShard; ++i) + BEAST_EXPECT(saveLedger(ndb, *data.ledgers_[i])); + + BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0)); + db->importDatabase(ndb); + for (std::uint32_t i = 1; i <= 2; ++i) + waitShard(*db, i); + BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0x6)); + } + { + Env env{*this, testConfig(shardDir.path())}; + DatabaseShard* db = env.app().getShardStore(); + BEAST_EXPECT(db); + + TestData data(seedValue, 4, 2); + if (!BEAST_EXPECT(data.makeLedgers(env))) + return; + + for (std::uint32_t i = 1; i <= 2; ++i) + waitShard(*db, i); + + BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0x6)); + + for (std::uint32_t i = 0; i < 2 * ledgersPerShard; ++i) + checkLedger(data, *db, *data.ledgers_[i]); + } + } + + void + testImportWithOnlineDelete(std::uint64_t const seedValue) + { + testcase("Import node store with online delete"); + + using namespace test::jtx; + using test::CaptureLogs; + + beast::temp_dir shardDir; + beast::temp_dir nodeDir; + std::string capturedLogs; + + { + auto c = testConfig(shardDir.path(), nodeDir.path()); + auto& section = c->section(ConfigSection::nodeDatabase()); + section.set("online_delete", "550"); + + // Adjust the log level to capture relevant output + c->section(SECTION_RPC_STARTUP) + .append( + "{ \"command\": \"log_level\", \"severity\": \"trace\" " + "}"); + + std::unique_ptr logs(new CaptureLogs(&capturedLogs)); + Env env{*this, std::move(c), std::move(logs)}; + + DatabaseShard* db = env.app().getShardStore(); + Database& ndb = env.app().getNodeStore(); + BEAST_EXPECT(db); + + auto const shardCount = 4; + TestData data(seedValue, 4, shardCount); + if (!BEAST_EXPECT(data.makeLedgers(env))) + return; + + // Start the import + db->importDatabase(ndb); + + while (!db->getDatabaseImportSequence()) + { + // Wait until the import starts + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + auto& store = env.app().getSHAMapStore(); + auto lastRotated = store.getLastRotated(); + + auto pauseVerifier = std::thread([lastRotated, &store, db, this] { + while (true) + { + // Make sure database rotations dont interfere + // with the import + + if (store.getLastRotated() != lastRotated) + { + // A rotation occurred during shard import. Not + // necessarily an error + auto sequence = db->getDatabaseImportSequence(); + BEAST_EXPECT(!sequence || sequence >= lastRotated); + + break; + } + } + }); + + data = TestData(seedValue * 2, 4, shardCount); + if (!BEAST_EXPECT(data.makeLedgers(env, shardCount))) + { + pauseVerifier.join(); + return; + } + + pauseVerifier.join(); + BEAST_EXPECT(store.getLastRotated() != lastRotated); + } + + // Database rotation should have been postponed at some + // point during the import + auto const expectedLogMessage = + "rotation would interfere with ShardStore import"; + BEAST_EXPECT( + capturedLogs.find(expectedLogMessage) != std::string::npos); + } + void testImportWithHistoricalPaths(std::uint64_t const seedValue) { @@ -1204,19 +1290,19 @@ class DatabaseShard_test : public TestBase Database& ndb = env.app().getNodeStore(); BEAST_EXPECT(db); - auto const ledgerCount = 4; + auto const shardCount = 4; - TestData data(seedValue, 4, ledgerCount); + TestData data(seedValue, 4, shardCount); if (!BEAST_EXPECT(data.makeLedgers(env))) return; - for (std::uint32_t i = 0; i < ledgerCount * ledgersPerShard; ++i) + for (std::uint32_t i = 0; i < shardCount * ledgersPerShard; ++i) BEAST_EXPECT(saveLedger(ndb, *data.ledgers_[i])); BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0)); - db->import(ndb); - for (std::uint32_t i = 1; i <= ledgerCount; ++i) + db->importDatabase(ndb); + for (std::uint32_t i = 1; i <= shardCount; ++i) waitShard(*db, i); BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0b11110)); @@ -1242,7 +1328,7 @@ class DatabaseShard_test : public TestBase // All historical shards should be stored // at historical paths - BEAST_EXPECT(historicalPathCount == ledgerCount - 2); + BEAST_EXPECT(historicalPathCount == shardCount - 2); } // Test importing with a single historical @@ -1262,19 +1348,19 @@ class DatabaseShard_test : public TestBase Database& ndb = env.app().getNodeStore(); BEAST_EXPECT(db); - auto const ledgerCount = 4; + auto const shardCount = 4; - TestData data(seedValue * 2, 4, ledgerCount); + TestData data(seedValue * 2, 4, shardCount); if (!BEAST_EXPECT(data.makeLedgers(env))) return; - for (std::uint32_t i = 0; i < ledgerCount * ledgersPerShard; ++i) + for (std::uint32_t i = 0; i < shardCount * ledgersPerShard; ++i) BEAST_EXPECT(saveLedger(ndb, *data.ledgers_[i])); BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0)); - db->import(ndb); - for (std::uint32_t i = 1; i <= ledgerCount; ++i) + db->importDatabase(ndb); + for (std::uint32_t i = 1; i <= shardCount; ++i) waitShard(*db, i); BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0b11110)); @@ -1293,7 +1379,7 @@ class DatabaseShard_test : public TestBase // All historical shards should be stored // at historical paths - BEAST_EXPECT(historicalPathCount == ledgerCount - 2); + BEAST_EXPECT(historicalPathCount == shardCount - 2); } } @@ -1331,9 +1417,9 @@ class DatabaseShard_test : public TestBase DatabaseShard* db = env.app().getShardStore(); BEAST_EXPECT(db); - auto const ledgerCount = 4; + auto const shardCount = 4; - TestData data(seedValue, 4, ledgerCount); + TestData data(seedValue, 4, shardCount); if (!BEAST_EXPECT(data.makeLedgers(env))) return; @@ -1341,10 +1427,10 @@ class DatabaseShard_test : public TestBase std::uint64_t bitMask = 0; // Add ten shards to the Shard Database - for (std::uint32_t i = 0; i < ledgerCount; ++i) + for (std::uint32_t i = 0; i < shardCount; ++i) { - auto n = createShard(data, *db, ledgerCount); - if (!BEAST_EXPECT(n && *n >= 1 && *n <= ledgerCount)) + auto n = createShard(data, *db, shardCount); + if (!BEAST_EXPECT(n && *n >= 1 && *n <= shardCount)) return; bitMask |= 1ll << *n; BEAST_EXPECT( @@ -1405,19 +1491,19 @@ class DatabaseShard_test : public TestBase // All historical shards should be stored // at historical paths - BEAST_EXPECT(historicalPathCount == ledgerCount - 2); + BEAST_EXPECT(historicalPathCount == shardCount - 2); - data = TestData(seedValue * 2, 4, ledgerCount); - if (!BEAST_EXPECT(data.makeLedgers(env, ledgerCount))) + data = TestData(seedValue * 2, 4, shardCount); + if (!BEAST_EXPECT(data.makeLedgers(env, shardCount))) return; // Add ten more shards to the Shard Database // to exercise recent shard rotation - for (std::uint32_t i = 0; i < ledgerCount; ++i) + for (std::uint32_t i = 0; i < shardCount; ++i) { - auto n = createShard(data, *db, ledgerCount * 2, ledgerCount); + auto n = createShard(data, *db, shardCount * 2, shardCount); if (!BEAST_EXPECT( - n && *n >= 1 + ledgerCount && *n <= ledgerCount * 2)) + n && *n >= 1 + shardCount && *n <= shardCount * 2)) return; bitMask |= 1ll << *n; BEAST_EXPECT( @@ -1467,7 +1553,7 @@ class DatabaseShard_test : public TestBase // All historical shards should be stored // at historical paths - BEAST_EXPECT(historicalPathCount == (ledgerCount * 2) - 2); + BEAST_EXPECT(historicalPathCount == (shardCount * 2) - 2); } } @@ -1529,21 +1615,26 @@ public: void run() override { - std::uint64_t const seedValue = 51; + auto seedValue = [] { + static std::uint64_t seedValue = 41; + seedValue += 10; + return seedValue; + }; testStandalone(); - testCreateShard(seedValue); - testReopenDatabase(seedValue + 10); - testGetCompleteShards(seedValue + 20); - testPrepareShards(seedValue + 30); - testImportShard(seedValue + 40); - testCorruptedDatabase(seedValue + 50); - testIllegalFinalKey(seedValue + 60); - testImport(seedValue + 70); - testDeterministicShard(seedValue + 80); - testImportWithHistoricalPaths(seedValue + 90); - testPrepareWithHistoricalPaths(seedValue + 100); - testOpenShardManagement(seedValue + 110); + testCreateShard(seedValue()); + testReopenDatabase(seedValue()); + testGetCompleteShards(seedValue()); + testPrepareShards(seedValue()); + testImportShard(seedValue()); + testCorruptedDatabase(seedValue()); + testIllegalFinalKey(seedValue()); + testDeterministicShard(seedValue()); + testImport(seedValue()); + testImportWithOnlineDelete(seedValue()); + testImportWithHistoricalPaths(seedValue()); + testPrepareWithHistoricalPaths(seedValue()); + testOpenShardManagement(seedValue()); } }; diff --git a/src/test/nodestore/Database_test.cpp b/src/test/nodestore/Database_test.cpp index dd8af68c78..bdbc96b524 100644 --- a/src/test/nodestore/Database_test.cpp +++ b/src/test/nodestore/Database_test.cpp @@ -487,7 +487,7 @@ public: srcBackendType + "'"); // Do the import - dest->import(*src); + dest->importDatabase(*src); // Get the results of the import fetchCopyOfBatch(*dest, ©, batch); diff --git a/src/test/rpc/RPCCall_test.cpp b/src/test/rpc/RPCCall_test.cpp index b029202039..cf4fbab1b1 100644 --- a/src/test/rpc/RPCCall_test.cpp +++ b/src/test/rpc/RPCCall_test.cpp @@ -4301,6 +4301,37 @@ static RPCCallTestData const rpcCallTestArray[] = { ] })"}, + // nodetoshard_status + // ------------------------------------------------------------------- + {"nodetoshard_status: minimal.", + __LINE__, + { + "nodetoshard_status", + }, + RPCCallTestData::no_exception, + R"({ + "method" : "nodetoshard_status", + "params" : [ + { + "api_version" : %MAX_API_VER%, + } + ] + })"}, + {"nodetoshard_status: too many arguments.", + __LINE__, + {"nodetoshard_status", "extra"}, + RPCCallTestData::no_exception, + R"({ + "method" : "nodetoshard_status", + "params" : [ + { + "error" : "badSyntax", + "error_code" : 1, + "error_message" : "Syntax error." + } + ] + })"}, + // owner_info // ------------------------------------------------------------------ {"owner_info: minimal.",