Implement node-to-shard RPC control

This commit is contained in:
Devon White
2021-04-07 12:03:33 -04:00
committed by manojsdoshi
parent 0320d2169e
commit 00a4c3a478
14 changed files with 666 additions and 95 deletions

View File

@@ -159,7 +159,7 @@ printHelp(const po::options_description& desc)
" ledger_request <ledger>\n"
" log_level [[<partition>] <severity>]\n"
" logrotate\n"
" nodetoshard_status\n"
" node_to_shard [status|start|stop]\n"
" peers\n"
" ping\n"
" random\n"

View File

@@ -29,6 +29,9 @@ NodeStoreScheduler::NodeStoreScheduler(JobQueue& jobQueue) : jobQueue_(jobQueue)
void
NodeStoreScheduler::scheduleTask(NodeStore::Task& task)
{
if (jobQueue_.isStopped())
return;
if (!jobQueue_.addJob(jtWRITE, "NodeObject::store", [&task](Job&) {
task.performScheduledTask();
}))
@@ -42,6 +45,9 @@ NodeStoreScheduler::scheduleTask(NodeStore::Task& task)
void
NodeStoreScheduler::onFetch(NodeStore::FetchReport const& report)
{
if (jobQueue_.isStopped())
return;
jobQueue_.addLoadEvents(
report.fetchType == NodeStore::FetchType::async ? jtNS_ASYNC_READ
: jtNS_SYNC_READ,
@@ -52,6 +58,9 @@ NodeStoreScheduler::onFetch(NodeStore::FetchReport const& report)
void
NodeStoreScheduler::onBatchWrite(NodeStore::BatchWriteReport const& report)
{
if (jobQueue_.isStopped())
return;
jobQueue_.addLoadEvents(jtNS_WRITE, report.writeCount, report.elapsed);
}

View File

@@ -936,6 +936,15 @@ private:
return jvRequest;
}
Json::Value
parseNodeToShard(Json::Value const& jvParams)
{
Json::Value jvRequest;
jvRequest[jss::action] = jvParams[0u].asString();
return jvRequest;
}
// peer_reservations_add <public_key> [<name>]
Json::Value
parsePeerReservationsAdd(Json::Value const& jvParams)
@@ -1257,7 +1266,7 @@ public:
{"log_level", &RPCParser::parseLogLevel, 0, 2},
{"logrotate", &RPCParser::parseAsIs, 0, 0},
{"manifest", &RPCParser::parseManifest, 1, 1},
{"nodetoshard_status", &RPCParser::parseAsIs, 0, 0},
{"node_to_shard", &RPCParser::parseNodeToShard, 1, 1},
{"owner_info", &RPCParser::parseAccountItems, 1, 3},
{"peers", &RPCParser::parseAsIs, 0, 0},
{"ping", &RPCParser::parseAsIs, 0, 0},

View File

@@ -252,9 +252,25 @@ public:
[[nodiscard]] virtual boost::filesystem::path const&
getRootDir() const = 0;
/** Returns a JSON object detailing the status of an ongoing
database import if one is running, otherwise an error
object.
*/
virtual Json::Value
getDatabaseImportStatus() const = 0;
/** Initiates a NodeStore to ShardStore import and returns
the result in a JSON object.
*/
virtual Json::Value
startNodeToShard() = 0;
/** Terminates a NodeStore to ShardStore import and returns
the result in a JSON object.
*/
virtual Json::Value
stopNodeToShard() = 0;
/** Returns the first ledger sequence of the shard currently being imported
from the NodeStore

View File

@@ -722,6 +722,8 @@ DatabaseShardImp::stop()
}
}
std::unique_lock lock(mutex_);
// Notify the shard being imported
// from the node store to stop
if (databaseImportStatus_)
@@ -735,7 +737,28 @@ DatabaseShardImp::stop()
// Wait for the node store import thread
// if necessary
if (databaseImporter_.joinable())
databaseImporter_.join();
{
// Tells the import function to halt
haltDatabaseImport_ = true;
// Wait for the function to exit
while (databaseImportStatus_)
{
// Unlock just in case the import
// function is waiting on the mutex
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
lock.lock();
}
// Calling join while holding the mutex_ without
// first making sure that doImportDatabase has
// exited could lead to deadlock via the mutex
// acquisition that occurs in that function
if (databaseImporter_.joinable())
databaseImporter_.join();
}
}
void
@@ -754,15 +777,19 @@ DatabaseShardImp::importDatabase(Database& source)
return;
}
// Run the lengthy node store import process in the background
// on a dedicated thread.
databaseImporter_ = std::thread([this] { doImportDatabase(); });
startDatabaseImportThread(lock);
}
void
DatabaseShardImp::doImportDatabase()
{
if (isStopping())
auto shouldHalt = [this] {
bool expected = true;
return haltDatabaseImport_.compare_exchange_strong(expected, false) ||
isStopping();
};
if (shouldHalt())
return;
auto loadLedger =
@@ -848,7 +875,7 @@ DatabaseShardImp::doImportDatabase()
for (std::uint32_t shardIndex = earliestIndex; shardIndex <= latestIndex;
++shardIndex)
{
if (isStopping())
if (shouldHalt())
return;
auto const pathDesignation = [this, shardIndex] {
@@ -920,7 +947,7 @@ DatabaseShardImp::doImportDatabase()
continue;
}
if (isStopping())
if (shouldHalt())
return;
bool const needsHistoricalPath =
@@ -938,7 +965,7 @@ DatabaseShardImp::doImportDatabase()
{
std::lock_guard lock(mutex_);
if (isStopping())
if (shouldHalt())
return;
databaseImportStatus_->currentIndex = shardIndex;
@@ -967,7 +994,7 @@ DatabaseShardImp::doImportDatabase()
while (auto const ledgerSeq = shard->prepare())
{
if (isStopping())
if (shouldHalt())
return;
// Not const so it may be moved later
@@ -989,7 +1016,7 @@ DatabaseShardImp::doImportDatabase()
recentStored = std::move(ledger);
}
if (isStopping())
if (shouldHalt())
return;
using namespace boost::filesystem;
@@ -1044,17 +1071,14 @@ DatabaseShardImp::doImportDatabase()
{
JLOG(j_.error()) << "shard " << shardIndex
<< " failed to import from the NodeStore";
shard->removeOnDestroy();
if (shard)
shard->removeOnDestroy();
}
}
{
std::lock_guard lock(mutex_);
if (isStopping())
return;
databaseImportStatus_.reset();
}
if (shouldHalt())
return;
updateFileStats();
}
@@ -1200,10 +1224,10 @@ DatabaseShardImp::sweep()
Json::Value
DatabaseShardImp::getDatabaseImportStatus() const
{
Json::Value ret(Json::objectValue);
if (std::lock_guard lock(mutex_); databaseImportStatus_)
{
Json::Value ret(Json::objectValue);
ret[jss::firstShardIndex] = databaseImportStatus_->earliestIndex;
ret[jss::lastShardIndex] = databaseImportStatus_->latestIndex;
ret[jss::currentShardIndex] = databaseImportStatus_->currentIndex;
@@ -1216,11 +1240,59 @@ DatabaseShardImp::getDatabaseImportStatus() const
currentShard[jss::storedSeqs] = shard->getStoredSeqs();
ret[jss::currentShard] = currentShard;
}
else
ret = "Database import not running";
return ret;
if (haltDatabaseImport_)
ret[jss::message] = "Database import halt initiated...";
return ret;
}
return RPC::make_error(rpcINTERNAL, "Database import not running");
}
Json::Value
DatabaseShardImp::startNodeToShard()
{
std::lock_guard lock(mutex_);
if (!init_)
return RPC::make_error(rpcINTERNAL, "Shard store not initialized");
if (databaseImporter_.joinable())
return RPC::make_error(
rpcINTERNAL, "Database import already in progress");
if (isStopping())
return RPC::make_error(rpcINTERNAL, "Node is shutting down");
startDatabaseImportThread(lock);
Json::Value result(Json::objectValue);
result[jss::message] = "Database import initiated...";
return result;
}
Json::Value
DatabaseShardImp::stopNodeToShard()
{
std::lock_guard lock(mutex_);
if (!init_)
return RPC::make_error(rpcINTERNAL, "Shard store not initialized");
if (!databaseImporter_.joinable())
return RPC::make_error(rpcINTERNAL, "Database import not running");
if (isStopping())
return RPC::make_error(rpcINTERNAL, "Node is shutting down");
haltDatabaseImport_ = true;
Json::Value result(Json::objectValue);
result[jss::message] = "Database import halt initiated...";
return result;
}
std::optional<std::uint32_t>
@@ -2131,6 +2203,27 @@ DatabaseShardImp::updatePeers(std::lock_guard<std::mutex> const& lock) const
}
}
void
DatabaseShardImp::startDatabaseImportThread(std::lock_guard<std::mutex> const&)
{
// Run the lengthy node store import process in the background
// on a dedicated thread.
databaseImporter_ = std::thread([this] {
doImportDatabase();
std::lock_guard lock(mutex_);
// Make sure to clear this in case the import
// exited early.
databaseImportStatus_.reset();
// Detach the thread so subsequent attempts
// to start the import won't get held up by
// the old thread of execution
databaseImporter_.detach();
});
}
//------------------------------------------------------------------------------
std::unique_ptr<DatabaseShard>

View File

@@ -135,6 +135,12 @@ public:
Json::Value
getDatabaseImportStatus() const override;
Json::Value
startNodeToShard() override;
Json::Value
stopNodeToShard() override;
std::optional<std::uint32_t>
getDatabaseImportSequence() const override;
@@ -285,6 +291,9 @@ private:
// Thread for running node store import
std::thread databaseImporter_;
// Indicates whether the import should stop
std::atomic_bool haltDatabaseImport_{false};
// Initialize settings from the configuration file
// Lock must be held
bool
@@ -407,6 +416,10 @@ private:
// Update peers with the status of every complete and incomplete shard
void
updatePeers(std::lock_guard<std::mutex> const& lock) const;
// Start the node store import process
void
startDatabaseImportThread(std::lock_guard<std::mutex> const&);
};
} // namespace NodeStore

View File

@@ -89,7 +89,7 @@ doLogRotate(RPC::JsonContext&);
Json::Value
doManifest(RPC::JsonContext&);
Json::Value
doNodeToShardStatus(RPC::JsonContext&);
doNodeToShard(RPC::JsonContext&);
Json::Value
doNoRippleCheck(RPC::JsonContext&);
Json::Value

View File

@@ -0,0 +1,86 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2021 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/json/json_value.h>
#include <ripple/net/RPCErr.h>
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/protocol/ErrorCodes.h>
#include <ripple/protocol/jss.h>
#include <ripple/rpc/Context.h>
namespace ripple {
// node_to_shard [status|start|stop]
Json::Value
doNodeToShard(RPC::JsonContext& context)
{
if (context.app.config().reporting())
return rpcError(rpcREPORTING_UNSUPPORTED);
// Shard store must be enabled
auto const shardStore = context.app.getShardStore();
if (!shardStore)
return rpcError(rpcINTERNAL, "No shard store");
if (!context.params.isMember(jss::action))
return RPC::missing_field_error(jss::action);
// Obtain and normalize the action to perform
auto const action = [&context] {
auto value = context.params[jss::action].asString();
boost::to_lower(value);
return value;
}();
// Vector of allowed actions
std::vector<std::string> const allowedActions = {"status", "start", "stop"};
// Validate the action
if (std::find(allowedActions.begin(), allowedActions.end(), action) ==
allowedActions.end())
return RPC::invalid_field_error(jss::action);
// Perform the action
if (action == "status")
{
// Get the status of the database import
return shardStore->getDatabaseImportStatus();
}
else if (action == "start")
{
// Kick off an import
return shardStore->startNodeToShard();
}
else if (action == "stop")
{
// Halt an import
return shardStore->stopNodeToShard();
}
else
{
// Shouldn't happen
assert(false);
return rpcError(rpcINTERNAL);
}
}
} // namespace ripple

View File

@@ -1,43 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2021 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/json/json_value.h>
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/protocol/ErrorCodes.h>
#include <ripple/protocol/jss.h>
#include <ripple/rpc/Context.h>
namespace ripple {
Json::Value
doNodeToShardStatus(RPC::JsonContext& context)
{
Json::Value ret(Json::objectValue);
if (auto const shardStore = context.app.getShardStore())
ret[jss::info] = shardStore->getDatabaseImportStatus();
else
ret = RPC::make_error(rpcINTERNAL, "No shard store");
return ret;
}
} // namespace ripple

View File

@@ -107,10 +107,7 @@ Handler const handlerArray[]{
{"log_level", byRef(&doLogLevel), Role::ADMIN, NO_CONDITION},
{"logrotate", byRef(&doLogRotate), Role::ADMIN, NO_CONDITION},
{"manifest", byRef(&doManifest), Role::USER, NO_CONDITION},
{"nodetoshard_status",
byRef(&doNodeToShardStatus),
Role::USER,
NO_CONDITION},
{"node_to_shard", byRef(&doNodeToShard), Role::ADMIN, NO_CONDITION},
{"noripple_check", byRef(&doNoRippleCheck), Role::USER, NO_CONDITION},
{"owner_info", byRef(&doOwnerInfo), Role::USER, NEEDS_CURRENT_LEDGER},
{"peers", byRef(&doPeers), Role::ADMIN, NO_CONDITION},