diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 4d931ac9c..731ea8bf6 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -598,7 +598,7 @@ target_sources (rippled PRIVATE src/ripple/rpc/handlers/LogLevel.cpp src/ripple/rpc/handlers/LogRotate.cpp src/ripple/rpc/handlers/Manifest.cpp - src/ripple/rpc/handlers/NodeToShardStatus.cpp + src/ripple/rpc/handlers/NodeToShard.cpp src/ripple/rpc/handlers/NoRippleCheck.cpp src/ripple/rpc/handlers/OwnerInfo.cpp src/ripple/rpc/handlers/PathFind.cpp @@ -934,6 +934,7 @@ if (tests) src/test/rpc/LedgerRPC_test.cpp src/test/rpc/LedgerRequestRPC_test.cpp src/test/rpc/ManifestRPC_test.cpp + src/test/rpc/NodeToShardRPC_test.cpp src/test/rpc/NoRippleCheck_test.cpp src/test/rpc/NoRipple_test.cpp src/test/rpc/OwnerInfo_test.cpp diff --git a/src/ripple/app/main/Main.cpp b/src/ripple/app/main/Main.cpp index fd99f9abb..214233953 100644 --- a/src/ripple/app/main/Main.cpp +++ b/src/ripple/app/main/Main.cpp @@ -159,7 +159,7 @@ printHelp(const po::options_description& desc) " ledger_request \n" " log_level [[] ]\n" " logrotate\n" - " nodetoshard_status\n" + " node_to_shard [status|start|stop]\n" " peers\n" " ping\n" " random\n" diff --git a/src/ripple/app/main/NodeStoreScheduler.cpp b/src/ripple/app/main/NodeStoreScheduler.cpp index d94c7ee3f..0e6c14adb 100644 --- a/src/ripple/app/main/NodeStoreScheduler.cpp +++ b/src/ripple/app/main/NodeStoreScheduler.cpp @@ -29,6 +29,9 @@ NodeStoreScheduler::NodeStoreScheduler(JobQueue& jobQueue) : jobQueue_(jobQueue) void NodeStoreScheduler::scheduleTask(NodeStore::Task& task) { + if (jobQueue_.isStopped()) + return; + if (!jobQueue_.addJob(jtWRITE, "NodeObject::store", [&task](Job&) { task.performScheduledTask(); })) @@ -42,6 +45,9 @@ NodeStoreScheduler::scheduleTask(NodeStore::Task& task) void NodeStoreScheduler::onFetch(NodeStore::FetchReport const& report) { + if (jobQueue_.isStopped()) + return; + jobQueue_.addLoadEvents( report.fetchType == NodeStore::FetchType::async ? jtNS_ASYNC_READ : jtNS_SYNC_READ, @@ -52,6 +58,9 @@ NodeStoreScheduler::onFetch(NodeStore::FetchReport const& report) void NodeStoreScheduler::onBatchWrite(NodeStore::BatchWriteReport const& report) { + if (jobQueue_.isStopped()) + return; + jobQueue_.addLoadEvents(jtNS_WRITE, report.writeCount, report.elapsed); } diff --git a/src/ripple/net/impl/RPCCall.cpp b/src/ripple/net/impl/RPCCall.cpp index 5c91c6e8f..a8d72eda2 100644 --- a/src/ripple/net/impl/RPCCall.cpp +++ b/src/ripple/net/impl/RPCCall.cpp @@ -936,6 +936,15 @@ private: return jvRequest; } + Json::Value + parseNodeToShard(Json::Value const& jvParams) + { + Json::Value jvRequest; + jvRequest[jss::action] = jvParams[0u].asString(); + + return jvRequest; + } + // peer_reservations_add [] Json::Value parsePeerReservationsAdd(Json::Value const& jvParams) @@ -1257,7 +1266,7 @@ public: {"log_level", &RPCParser::parseLogLevel, 0, 2}, {"logrotate", &RPCParser::parseAsIs, 0, 0}, {"manifest", &RPCParser::parseManifest, 1, 1}, - {"nodetoshard_status", &RPCParser::parseAsIs, 0, 0}, + {"node_to_shard", &RPCParser::parseNodeToShard, 1, 1}, {"owner_info", &RPCParser::parseAccountItems, 1, 3}, {"peers", &RPCParser::parseAsIs, 0, 0}, {"ping", &RPCParser::parseAsIs, 0, 0}, diff --git a/src/ripple/nodestore/DatabaseShard.h b/src/ripple/nodestore/DatabaseShard.h index f45d265a1..55f864f41 100644 --- a/src/ripple/nodestore/DatabaseShard.h +++ b/src/ripple/nodestore/DatabaseShard.h @@ -252,9 +252,25 @@ public: [[nodiscard]] virtual boost::filesystem::path const& getRootDir() const = 0; + /** Returns a JSON object detailing the status of an ongoing + database import if one is running, otherwise an error + object. + */ virtual Json::Value getDatabaseImportStatus() const = 0; + /** Initiates a NodeStore to ShardStore import and returns + the result in a JSON object. + */ + virtual Json::Value + startNodeToShard() = 0; + + /** Terminates a NodeStore to ShardStore import and returns + the result in a JSON object. + */ + virtual Json::Value + stopNodeToShard() = 0; + /** Returns the first ledger sequence of the shard currently being imported from the NodeStore diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index eb5bab141..1dc5dab6c 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -722,6 +722,8 @@ DatabaseShardImp::stop() } } + std::unique_lock lock(mutex_); + // Notify the shard being imported // from the node store to stop if (databaseImportStatus_) @@ -735,7 +737,28 @@ DatabaseShardImp::stop() // Wait for the node store import thread // if necessary if (databaseImporter_.joinable()) - databaseImporter_.join(); + { + // Tells the import function to halt + haltDatabaseImport_ = true; + + // Wait for the function to exit + while (databaseImportStatus_) + { + // Unlock just in case the import + // function is waiting on the mutex + lock.unlock(); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + lock.lock(); + } + + // Calling join while holding the mutex_ without + // first making sure that doImportDatabase has + // exited could lead to deadlock via the mutex + // acquisition that occurs in that function + if (databaseImporter_.joinable()) + databaseImporter_.join(); + } } void @@ -754,15 +777,19 @@ DatabaseShardImp::importDatabase(Database& source) return; } - // Run the lengthy node store import process in the background - // on a dedicated thread. - databaseImporter_ = std::thread([this] { doImportDatabase(); }); + startDatabaseImportThread(lock); } void DatabaseShardImp::doImportDatabase() { - if (isStopping()) + auto shouldHalt = [this] { + bool expected = true; + return haltDatabaseImport_.compare_exchange_strong(expected, false) || + isStopping(); + }; + + if (shouldHalt()) return; auto loadLedger = @@ -848,7 +875,7 @@ DatabaseShardImp::doImportDatabase() for (std::uint32_t shardIndex = earliestIndex; shardIndex <= latestIndex; ++shardIndex) { - if (isStopping()) + if (shouldHalt()) return; auto const pathDesignation = [this, shardIndex] { @@ -920,7 +947,7 @@ DatabaseShardImp::doImportDatabase() continue; } - if (isStopping()) + if (shouldHalt()) return; bool const needsHistoricalPath = @@ -938,7 +965,7 @@ DatabaseShardImp::doImportDatabase() { std::lock_guard lock(mutex_); - if (isStopping()) + if (shouldHalt()) return; databaseImportStatus_->currentIndex = shardIndex; @@ -967,7 +994,7 @@ DatabaseShardImp::doImportDatabase() while (auto const ledgerSeq = shard->prepare()) { - if (isStopping()) + if (shouldHalt()) return; // Not const so it may be moved later @@ -989,7 +1016,7 @@ DatabaseShardImp::doImportDatabase() recentStored = std::move(ledger); } - if (isStopping()) + if (shouldHalt()) return; using namespace boost::filesystem; @@ -1044,17 +1071,14 @@ DatabaseShardImp::doImportDatabase() { JLOG(j_.error()) << "shard " << shardIndex << " failed to import from the NodeStore"; - shard->removeOnDestroy(); + + if (shard) + shard->removeOnDestroy(); } } - { - std::lock_guard lock(mutex_); - if (isStopping()) - return; - - databaseImportStatus_.reset(); - } + if (shouldHalt()) + return; updateFileStats(); } @@ -1200,10 +1224,10 @@ DatabaseShardImp::sweep() Json::Value DatabaseShardImp::getDatabaseImportStatus() const { - Json::Value ret(Json::objectValue); - if (std::lock_guard lock(mutex_); databaseImportStatus_) { + Json::Value ret(Json::objectValue); + ret[jss::firstShardIndex] = databaseImportStatus_->earliestIndex; ret[jss::lastShardIndex] = databaseImportStatus_->latestIndex; ret[jss::currentShardIndex] = databaseImportStatus_->currentIndex; @@ -1216,11 +1240,59 @@ DatabaseShardImp::getDatabaseImportStatus() const currentShard[jss::storedSeqs] = shard->getStoredSeqs(); ret[jss::currentShard] = currentShard; - } - else - ret = "Database import not running"; - return ret; + if (haltDatabaseImport_) + ret[jss::message] = "Database import halt initiated..."; + + return ret; + } + + return RPC::make_error(rpcINTERNAL, "Database import not running"); +} + +Json::Value +DatabaseShardImp::startNodeToShard() +{ + std::lock_guard lock(mutex_); + + if (!init_) + return RPC::make_error(rpcINTERNAL, "Shard store not initialized"); + + if (databaseImporter_.joinable()) + return RPC::make_error( + rpcINTERNAL, "Database import already in progress"); + + if (isStopping()) + return RPC::make_error(rpcINTERNAL, "Node is shutting down"); + + startDatabaseImportThread(lock); + + Json::Value result(Json::objectValue); + result[jss::message] = "Database import initiated..."; + + return result; +} + +Json::Value +DatabaseShardImp::stopNodeToShard() +{ + std::lock_guard lock(mutex_); + + if (!init_) + return RPC::make_error(rpcINTERNAL, "Shard store not initialized"); + + if (!databaseImporter_.joinable()) + return RPC::make_error(rpcINTERNAL, "Database import not running"); + + if (isStopping()) + return RPC::make_error(rpcINTERNAL, "Node is shutting down"); + + haltDatabaseImport_ = true; + + Json::Value result(Json::objectValue); + result[jss::message] = "Database import halt initiated..."; + + return result; } std::optional @@ -2131,6 +2203,27 @@ DatabaseShardImp::updatePeers(std::lock_guard const& lock) const } } +void +DatabaseShardImp::startDatabaseImportThread(std::lock_guard const&) +{ + // Run the lengthy node store import process in the background + // on a dedicated thread. + databaseImporter_ = std::thread([this] { + doImportDatabase(); + + std::lock_guard lock(mutex_); + + // Make sure to clear this in case the import + // exited early. + databaseImportStatus_.reset(); + + // Detach the thread so subsequent attempts + // to start the import won't get held up by + // the old thread of execution + databaseImporter_.detach(); + }); +} + //------------------------------------------------------------------------------ std::unique_ptr diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index fde27260d..327ab4865 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -135,6 +135,12 @@ public: Json::Value getDatabaseImportStatus() const override; + Json::Value + startNodeToShard() override; + + Json::Value + stopNodeToShard() override; + std::optional getDatabaseImportSequence() const override; @@ -285,6 +291,9 @@ private: // Thread for running node store import std::thread databaseImporter_; + // Indicates whether the import should stop + std::atomic_bool haltDatabaseImport_{false}; + // Initialize settings from the configuration file // Lock must be held bool @@ -407,6 +416,10 @@ private: // Update peers with the status of every complete and incomplete shard void updatePeers(std::lock_guard const& lock) const; + + // Start the node store import process + void + startDatabaseImportThread(std::lock_guard const&); }; } // namespace NodeStore diff --git a/src/ripple/rpc/handlers/Handlers.h b/src/ripple/rpc/handlers/Handlers.h index 5fb42268f..264d0a3f1 100644 --- a/src/ripple/rpc/handlers/Handlers.h +++ b/src/ripple/rpc/handlers/Handlers.h @@ -89,7 +89,7 @@ doLogRotate(RPC::JsonContext&); Json::Value doManifest(RPC::JsonContext&); Json::Value -doNodeToShardStatus(RPC::JsonContext&); +doNodeToShard(RPC::JsonContext&); Json::Value doNoRippleCheck(RPC::JsonContext&); Json::Value diff --git a/src/ripple/rpc/handlers/NodeToShard.cpp b/src/ripple/rpc/handlers/NodeToShard.cpp new file mode 100644 index 000000000..ac250db18 --- /dev/null +++ b/src/ripple/rpc/handlers/NodeToShard.cpp @@ -0,0 +1,86 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2021 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ripple { + +// node_to_shard [status|start|stop] +Json::Value +doNodeToShard(RPC::JsonContext& context) +{ + if (context.app.config().reporting()) + return rpcError(rpcREPORTING_UNSUPPORTED); + + // Shard store must be enabled + auto const shardStore = context.app.getShardStore(); + if (!shardStore) + return rpcError(rpcINTERNAL, "No shard store"); + + if (!context.params.isMember(jss::action)) + return RPC::missing_field_error(jss::action); + + // Obtain and normalize the action to perform + auto const action = [&context] { + auto value = context.params[jss::action].asString(); + boost::to_lower(value); + + return value; + }(); + + // Vector of allowed actions + std::vector const allowedActions = {"status", "start", "stop"}; + + // Validate the action + if (std::find(allowedActions.begin(), allowedActions.end(), action) == + allowedActions.end()) + return RPC::invalid_field_error(jss::action); + + // Perform the action + if (action == "status") + { + // Get the status of the database import + return shardStore->getDatabaseImportStatus(); + } + else if (action == "start") + { + // Kick off an import + return shardStore->startNodeToShard(); + } + else if (action == "stop") + { + // Halt an import + return shardStore->stopNodeToShard(); + } + else + { + // Shouldn't happen + assert(false); + return rpcError(rpcINTERNAL); + } +} + +} // namespace ripple diff --git a/src/ripple/rpc/handlers/NodeToShardStatus.cpp b/src/ripple/rpc/handlers/NodeToShardStatus.cpp index f90f4d573..e69de29bb 100644 --- a/src/ripple/rpc/handlers/NodeToShardStatus.cpp +++ b/src/ripple/rpc/handlers/NodeToShardStatus.cpp @@ -1,43 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2021 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#include -#include -#include -#include -#include -#include -#include - -namespace ripple { - -Json::Value -doNodeToShardStatus(RPC::JsonContext& context) -{ - Json::Value ret(Json::objectValue); - - if (auto const shardStore = context.app.getShardStore()) - ret[jss::info] = shardStore->getDatabaseImportStatus(); - else - ret = RPC::make_error(rpcINTERNAL, "No shard store"); - - return ret; -} - -} // namespace ripple diff --git a/src/ripple/rpc/impl/Handler.cpp b/src/ripple/rpc/impl/Handler.cpp index 6e313931f..7c193f554 100644 --- a/src/ripple/rpc/impl/Handler.cpp +++ b/src/ripple/rpc/impl/Handler.cpp @@ -107,10 +107,7 @@ Handler const handlerArray[]{ {"log_level", byRef(&doLogLevel), Role::ADMIN, NO_CONDITION}, {"logrotate", byRef(&doLogRotate), Role::ADMIN, NO_CONDITION}, {"manifest", byRef(&doManifest), Role::USER, NO_CONDITION}, - {"nodetoshard_status", - byRef(&doNodeToShardStatus), - Role::USER, - NO_CONDITION}, + {"node_to_shard", byRef(&doNodeToShard), Role::ADMIN, NO_CONDITION}, {"noripple_check", byRef(&doNoRippleCheck), Role::USER, NO_CONDITION}, {"owner_info", byRef(&doOwnerInfo), Role::USER, NEEDS_CURRENT_LEDGER}, {"peers", byRef(&doPeers), Role::ADMIN, NO_CONDITION}, diff --git a/src/test/nodestore/DatabaseShard_test.cpp b/src/test/nodestore/DatabaseShard_test.cpp index e6f1e3e15..88bcd4977 100644 --- a/src/test/nodestore/DatabaseShard_test.cpp +++ b/src/test/nodestore/DatabaseShard_test.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -263,6 +264,10 @@ class DatabaseShard_test : public TestBase { using namespace test::jtx; + // The local fee may go up, especially in the online delete tests + while (env_.app().getFeeTrack().lowerLocalFee()) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + if (isNewAccounts(seq)) env_.fund(XRP(iniAmount), accounts_[nAccounts_[seq] - 1]); @@ -1249,14 +1254,22 @@ class DatabaseShard_test : public TestBase Database& ndb = env.app().getNodeStore(); BEAST_EXPECT(db); + auto& store = env.app().getSHAMapStore(); + + // Allow online delete to delete the startup ledgers + // so that it will take some time for the import to + // catch up to the point of the next rotation + store.setCanDelete(10); + // Create some ledgers for the shard store to import auto const shardCount = 5; TestData data(seedValue, 4, shardCount); if (!BEAST_EXPECT(data.makeLedgers(env))) return; - auto& store = env.app().getSHAMapStore(); - auto lastRotated = store.getLastRotated(); + store.rendezvous(); + auto const lastRotated = store.getLastRotated(); + BEAST_EXPECT(lastRotated >= 553 && lastRotated < 1103); // Start the import db->importDatabase(ndb); @@ -1267,37 +1280,45 @@ class DatabaseShard_test : public TestBase std::this_thread::sleep_for(std::chrono::milliseconds(1)); } - // Enable online deletion now that the import has started + // Enable unimpeded online deletion now that the import has started store.setCanDelete(std::numeric_limits::max()); auto pauseVerifier = std::thread([lastRotated, &store, db, this] { - while (true) + // The import should still be running when this thread starts + BEAST_EXPECT(db->getDatabaseImportSequence()); + auto rotationProgress = lastRotated; + while (auto const ledgerSeq = db->getDatabaseImportSequence()) { // Make sure database rotations dont interfere // with the import - if (store.getLastRotated() != lastRotated) + auto const last = store.getLastRotated(); + if (last != rotationProgress) { // A rotation occurred during shard import. Not // necessarily an error - auto const ledgerSeq = db->getDatabaseImportSequence(); - BEAST_EXPECT(!ledgerSeq || ledgerSeq >= lastRotated); - - break; + BEAST_EXPECT( + !ledgerSeq || ledgerSeq >= rotationProgress); + rotationProgress = last; } } }); + auto join = [&pauseVerifier]() { + if (pauseVerifier.joinable()) + pauseVerifier.join(); + }; + // Create more ledgers to trigger online deletion data = TestData(seedValue * 2); if (!BEAST_EXPECT(data.makeLedgers(env, shardCount))) { - pauseVerifier.join(); + join(); return; } - pauseVerifier.join(); + join(); BEAST_EXPECT(store.getLastRotated() != lastRotated); } diff --git a/src/test/rpc/NodeToShardRPC_test.cpp b/src/test/rpc/NodeToShardRPC_test.cpp new file mode 100644 index 000000000..64e089b0b --- /dev/null +++ b/src/test/rpc/NodeToShardRPC_test.cpp @@ -0,0 +1,331 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2021 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include +#include + +namespace ripple { +namespace test { + +class NodeToShardRPC_test : public beast::unit_test::suite +{ + bool + importCompleted( + NodeStore::DatabaseShard* shardStore, + std::uint8_t const numberOfShards, + Json::Value const& result) + { + auto const info = shardStore->getShardInfo(); + + // Assume completed if the import isn't running + auto const completed = + result[jss::error_message] == "Database import not running"; + + if (completed) + { + BEAST_EXPECT( + info->incomplete().size() + info->finalized().size() == + numberOfShards); + } + + return completed; + } + +public: + void + testStart() + { + testcase("Start"); + + beast::temp_dir tempDir; + + jtx::Env env = [&] { + auto c = jtx::envconfig(); + auto& section = c->section(ConfigSection::shardDatabase()); + section.set("path", tempDir.path()); + section.set("max_historical_shards", "20"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + auto& sectionNode = c->section(ConfigSection::nodeDatabase()); + sectionNode.set("earliest_seq", "257"); + sectionNode.set("ledgers_per_shard", "256"); + c->setupControl(true, true, true); + + return jtx::Env(*this, std::move(c)); + }(); + + std::uint8_t const numberOfShards = 10; + + // Create some ledgers so that we can initiate a + // shard store database import. + for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() * + (numberOfShards + 1); + ++i) + { + env.close(); + } + + auto shardStore = env.app().getShardStore(); + if (!BEAST_EXPECT(shardStore)) + return; + + { + // Initiate a shard store import via the RPC + // interface. + + Json::Value jvParams; + jvParams[jss::action] = "start"; + + auto const result = env.rpc( + "json", "node_to_shard", to_string(jvParams))[jss::result]; + + BEAST_EXPECT( + result[jss::message] == "Database import initiated..."); + } + + while (!shardStore->getDatabaseImportSequence()) + { + // Wait until the import starts + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + { + // Verify that the import is in progress with + // the node_to_shard status RPC command + + Json::Value jvParams; + jvParams[jss::action] = "status"; + + auto const result = env.rpc( + "json", "node_to_shard", to_string(jvParams))[jss::result]; + + BEAST_EXPECT( + result[jss::status] == "success" || + importCompleted(shardStore, numberOfShards, result)); + + std::chrono::seconds const maxWait{60}; + auto const start = std::chrono::system_clock::now(); + + while (true) + { + // Verify that the status object accurately + // reflects import progress. + + auto const completeShards = + shardStore->getShardInfo()->finalized(); + + if (!completeShards.empty()) + { + auto const result = env.rpc( + "json", + "node_to_shard", + to_string(jvParams))[jss::result]; + + if (!importCompleted(shardStore, numberOfShards, result)) + { + BEAST_EXPECT(result[jss::firstShardIndex] == 1); + BEAST_EXPECT(result[jss::lastShardIndex] == 10); + } + } + + if (boost::icl::contains(completeShards, 1)) + { + auto const result = env.rpc( + "json", + "node_to_shard", + to_string(jvParams))[jss::result]; + + BEAST_EXPECT( + result[jss::currentShardIndex] >= 1 || + importCompleted(shardStore, numberOfShards, result)); + + break; + } + + if (std::this_thread::sleep_for(std::chrono::milliseconds{100}); + std::chrono::system_clock::now() - start > maxWait) + { + BEAST_EXPECTS( + false, "Import timeout: could just be a slow machine."); + break; + } + } + + // Wait for the import to complete + while (!boost::icl::contains( + shardStore->getShardInfo()->finalized(), 10)) + { + if (std::this_thread::sleep_for(std::chrono::milliseconds{100}); + std::chrono::system_clock::now() - start > maxWait) + { + BEAST_EXPECT( + importCompleted(shardStore, numberOfShards, result)); + break; + } + } + } + } + + void + testStop() + { + testcase("Stop"); + + beast::temp_dir tempDir; + + jtx::Env env = [&] { + auto c = jtx::envconfig(); + auto& section = c->section(ConfigSection::shardDatabase()); + section.set("path", tempDir.path()); + section.set("max_historical_shards", "20"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + auto& sectionNode = c->section(ConfigSection::nodeDatabase()); + sectionNode.set("earliest_seq", "257"); + sectionNode.set("ledgers_per_shard", "256"); + c->setupControl(true, true, true); + + return jtx::Env(*this, std::move(c)); + }(); + + std::uint8_t const numberOfShards = 10; + + // Create some ledgers so that we can initiate a + // shard store database import. + for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() * + (numberOfShards + 1); + ++i) + { + env.close(); + } + + auto shardStore = env.app().getShardStore(); + if (!BEAST_EXPECT(shardStore)) + return; + + { + // Initiate a shard store import via the RPC + // interface. + + Json::Value jvParams; + jvParams[jss::action] = "start"; + + auto const result = env.rpc( + "json", "node_to_shard", to_string(jvParams))[jss::result]; + + BEAST_EXPECT( + result[jss::message] == "Database import initiated..."); + } + + { + // Verify that the import is in progress with + // the node_to_shard status RPC command + + Json::Value jvParams; + jvParams[jss::action] = "status"; + + auto const result = env.rpc( + "json", "node_to_shard", to_string(jvParams))[jss::result]; + + BEAST_EXPECT( + result[jss::status] == "success" || + importCompleted(shardStore, numberOfShards, result)); + + std::chrono::seconds const maxWait{10}; + auto const start = std::chrono::system_clock::now(); + + while (shardStore->getShardInfo()->finalized().empty()) + { + // Wait for at least one shard to complete + + if (std::this_thread::sleep_for(std::chrono::milliseconds{100}); + std::chrono::system_clock::now() - start > maxWait) + { + BEAST_EXPECTS( + false, "Import timeout: could just be a slow machine."); + break; + } + } + } + + { + Json::Value jvParams; + jvParams[jss::action] = "stop"; + + auto const result = env.rpc( + "json", "node_to_shard", to_string(jvParams))[jss::result]; + + BEAST_EXPECT( + result[jss::message] == "Database import halt initiated..." || + importCompleted(shardStore, numberOfShards, result)); + } + + std::chrono::seconds const maxWait{10}; + auto const start = std::chrono::system_clock::now(); + + while (true) + { + // Wait until we can verify that the import has + // stopped + + Json::Value jvParams; + jvParams[jss::action] = "status"; + + auto const result = env.rpc( + "json", "node_to_shard", to_string(jvParams))[jss::result]; + + // When the import has stopped, polling the + // status returns an error + if (result.isMember(jss::error)) + { + if (BEAST_EXPECT(result.isMember(jss::error_message))) + { + BEAST_EXPECT( + result[jss::error_message] == + "Database import not running"); + } + + break; + } + + if (std::this_thread::sleep_for(std::chrono::milliseconds{100}); + std::chrono::system_clock::now() - start > maxWait) + { + BEAST_EXPECTS( + false, "Import timeout: could just be a slow machine."); + break; + } + } + } + + void + run() override + { + testStart(); + testStop(); + } +}; + +BEAST_DEFINE_TESTSUITE(NodeToShardRPC, rpc, ripple); +} // namespace test +} // namespace ripple diff --git a/src/test/rpc/RPCCall_test.cpp b/src/test/rpc/RPCCall_test.cpp index d3565b986..966d325f4 100644 --- a/src/test/rpc/RPCCall_test.cpp +++ b/src/test/rpc/RPCCall_test.cpp @@ -4301,28 +4301,53 @@ static RPCCallTestData const rpcCallTestArray[] = { ] })"}, - // nodetoshard_status + // node_to_shard // ------------------------------------------------------------------- - {"nodetoshard_status: minimal.", + {"node_to_shard: status.", __LINE__, - { - "nodetoshard_status", - }, + {"node_to_shard", "status"}, RPCCallTestData::no_exception, R"({ - "method" : "nodetoshard_status", + "method" : "node_to_shard", "params" : [ { "api_version" : %MAX_API_VER%, + "action" : "status" } ] })"}, - {"nodetoshard_status: too many arguments.", + {"node_to_shard: start.", __LINE__, - {"nodetoshard_status", "extra"}, + {"node_to_shard", "start"}, RPCCallTestData::no_exception, R"({ - "method" : "nodetoshard_status", + "method" : "node_to_shard", + "params" : [ + { + "api_version" : %MAX_API_VER%, + "action" : "start" + } + ] + })"}, + {"node_to_shard: stop.", + __LINE__, + {"node_to_shard", "stop"}, + RPCCallTestData::no_exception, + R"({ + "method" : "node_to_shard", + "params" : [ + { + "api_version" : %MAX_API_VER%, + "action" : "stop" + } + ] + })"}, + {"node_to_shard: too many arguments.", + __LINE__, + {"node_to_shard", "start", "stop"}, + RPCCallTestData::no_exception, + R"({ + "method" : "node_to_shard", "params" : [ { "error" : "badSyntax", @@ -4331,6 +4356,19 @@ static RPCCallTestData const rpcCallTestArray[] = { } ] })"}, + {"node_to_shard: invalid argument.", + __LINE__, + {"node_to_shard", "invalid"}, + RPCCallTestData::no_exception, + R"({ + "method" : "node_to_shard", + "params" : [ + { + "api_version" : %MAX_API_VER%, + "action" : "invalid" + } + ] + })"}, // owner_info // ------------------------------------------------------------------