From a73372cb9dd8470a64e2edefc069311d62278a23 Mon Sep 17 00:00:00 2001 From: Miguel Portilla Date: Thu, 15 Mar 2018 09:17:02 -0400 Subject: [PATCH] Add RPC shard download --- cfg/rippled-example.cfg | 3 +- src/ripple/app/main/Main.cpp | 5 +- src/ripple/basics/StringUtilities.h | 9 + src/ripple/net/impl/RPCCall.cpp | 32 +++ src/ripple/protocol/JsonFields.h | 9 +- src/ripple/rpc/ShardArchiveHandler.h | 97 +++++++ src/ripple/rpc/handlers/DownloadShard.cpp | 151 +++++++++++ src/ripple/rpc/handlers/Handlers.h | 1 + src/ripple/rpc/impl/Handler.cpp | 1 + src/ripple/rpc/impl/ShardArchiveHandler.cpp | 265 ++++++++++++++++++++ src/ripple/unity/rpcx1.cpp | 1 + src/ripple/unity/rpcx2.cpp | 1 + 12 files changed, 568 insertions(+), 7 deletions(-) create mode 100644 src/ripple/rpc/ShardArchiveHandler.h create mode 100644 src/ripple/rpc/handlers/DownloadShard.cpp create mode 100644 src/ripple/rpc/impl/ShardArchiveHandler.cpp diff --git a/cfg/rippled-example.cfg b/cfg/rippled-example.cfg index fbcf2e290d..5b20becb23 100644 --- a/cfg/rippled-example.cfg +++ b/cfg/rippled-example.cfg @@ -847,11 +847,12 @@ # # Example: # type=nudb -# path=db/nudb +# path=db/shards/nudb # # The "type" field must be present and controls the choice of backend: # # type = NuDB +# NuDB is recommended for shards. # # type = RocksDB # diff --git a/src/ripple/app/main/Main.cpp b/src/ripple/app/main/Main.cpp index b438dfdecf..a8a70d1052 100644 --- a/src/ripple/app/main/Main.cpp +++ b/src/ripple/app/main/Main.cpp @@ -144,7 +144,8 @@ void printHelp (const po::options_description& desc) " channel_verify \n" " connect []\n" " consensus_info\n" - " deposit_authorized []" + " deposit_authorized []\n" + " download_shard [[ ]] \n" " feature [ [accept|reject]]\n" " fetch_info [clear]\n" " gateway_balances [] [ [ ]]\n" @@ -579,7 +580,7 @@ int run (int argc, char** argv) if (vm.count("nodetoshard")) config->nodeToShard = true; - if (vm.count ("validateShards ")) + if (vm.count ("validateShards")) config->validateShards = true; if (vm.count ("ledger")) diff --git a/src/ripple/basics/StringUtilities.h b/src/ripple/basics/StringUtilities.h index 18e718fca9..2f5b5b6e23 100644 --- a/src/ripple/basics/StringUtilities.h +++ b/src/ripple/basics/StringUtilities.h @@ -92,6 +92,15 @@ struct parsedURL std::string domain; boost::optional port; std::string path; + + bool + operator == (parsedURL const& other) const + { + return scheme == other.scheme && + domain == other.domain && + port == other.port && + path == other.path; + } }; bool parseUrl (parsedURL& pUrl, std::string const& strUrl); diff --git a/src/ripple/net/impl/RPCCall.cpp b/src/ripple/net/impl/RPCCall.cpp index a91bbd6d95..b73af15707 100644 --- a/src/ripple/net/impl/RPCCall.cpp +++ b/src/ripple/net/impl/RPCCall.cpp @@ -151,6 +151,37 @@ private: return v; } + Json::Value parseDownloadShard(Json::Value const& jvParams) + { + Json::Value jvResult(Json::objectValue); + unsigned int sz {jvParams.size()}; + unsigned int i {0}; + + // If odd number of params then 'novalidate' may have been specified + if (sz & 1) + { + using namespace boost::beast::detail; + if (iequals(jvParams[0u].asString(), "novalidate")) + ++i; + else if (!iequals(jvParams[--sz].asString(), "novalidate")) + return rpcError(rpcINVALID_PARAMS); + jvResult[jss::validate] = false; + } + + // Create the 'shards' array + Json::Value shards(Json::arrayValue); + for (; i < sz; i += 2) + { + Json::Value shard(Json::objectValue); + shard[jss::index] = jvParams[i].asUInt(); + shard[jss::url] = jvParams[i + 1].asString(); + shards.append(std::move(shard)); + } + jvResult[jss::shards] = std::move(shards); + + return jvResult; + } + Json::Value parseInternal (Json::Value const& jvParams) { Json::Value v (Json::objectValue); @@ -1085,6 +1116,7 @@ public: { "connect", &RPCParser::parseConnect, 1, 2 }, { "consensus_info", &RPCParser::parseAsIs, 0, 0 }, { "deposit_authorized", &RPCParser::parseDepositAuthorized, 2, 3 }, + { "download_shard", &RPCParser::parseDownloadShard, 2, -1 }, { "feature", &RPCParser::parseFeature, 0, 2 }, { "fetch_info", &RPCParser::parseFetchInfo, 0, 1 }, { "gateway_balances", &RPCParser::parseGatewayBalances, 1, -1 }, diff --git a/src/ripple/protocol/JsonFields.h b/src/ripple/protocol/JsonFields.h index c4bf5e8ace..70c2cc2bdb 100644 --- a/src/ripple/protocol/JsonFields.h +++ b/src/ripple/protocol/JsonFields.h @@ -210,9 +210,9 @@ JSS ( ident ); // in: AccountCurrencies, AccountInfo, // OwnerInfo JSS ( inLedger ); // out: tx/Transaction JSS ( inbound ); // out: PeerImp -JSS ( index ); // in: LedgerEntry; out: PathState, - // STLedgerEntry, LedgerEntry, - // TxHistory, LedgerData; +JSS ( index ); // in: LedgerEntry, DownloadShard + // out: PathState, STLedgerEntry, + // LedgerEntry, TxHistory, LedgerData // field JSS ( info ); // out: ServerInfo, ConsensusInfo, FetchInfo JSS ( internal_command ); // in: Internal @@ -401,7 +401,7 @@ JSS ( server_state ); // out: NetworkOPs JSS ( server_status ); // out: NetworkOPs JSS ( settle_delay ); // out: AccountChannels JSS ( severity ); // in: LogLevel -JSS ( shards ); // out: GetCounts +JSS ( shards ); // in/out: GetCounts, DownloadShard JSS ( signature ); // out: NetworkOPs, ChannelAuthorize JSS ( signature_verified ); // out: ChannelVerify JSS ( signing_key ); // out: NetworkOPs @@ -477,6 +477,7 @@ JSS ( url_password ); // in: Subscribe JSS ( url_username ); // in: Subscribe JSS ( urlgravatar ); // JSS ( username ); // in: Subscribe +JSS ( validate ); // in: DownloadShard JSS ( validated ); // out: NetworkOPs, RPCHelpers, AccountTx* // Tx JSS ( validator_list_expires ); // out: NetworkOps, ValidatorList diff --git a/src/ripple/rpc/ShardArchiveHandler.h b/src/ripple/rpc/ShardArchiveHandler.h new file mode 100644 index 0000000000..45c0b4556b --- /dev/null +++ b/src/ripple/rpc/ShardArchiveHandler.h @@ -0,0 +1,97 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_RPC_SHARDARCHIVEHANDLER_H_INCLUDED +#define RIPPLE_RPC_SHARDARCHIVEHANDLER_H_INCLUDED + +#include +#include +#include +#include + +#include +#include + +namespace ripple { +namespace RPC { + +/** Handles the download and import one or more shard archives. */ +class ShardArchiveHandler + : public std::enable_shared_from_this +{ +public: + ShardArchiveHandler() = delete; + ShardArchiveHandler(ShardArchiveHandler const&) = delete; + ShardArchiveHandler& operator= (ShardArchiveHandler&&) = delete; + ShardArchiveHandler& operator= (ShardArchiveHandler const&) = delete; + + /** @param validate if shard data should be verified with network. */ + ShardArchiveHandler(Application& app, bool validate); + + ~ShardArchiveHandler(); + + /** Initializes the handler. + @return `true` if successfully initialized. + */ + bool + init(); + + /** Queue an archive to be downloaded and imported. + @param shardIndex the index of the shard to be imported. + @param url the location of the archive. + @return `true` if successfully added. + */ + bool + add(std::uint32_t shardIndex, parsedURL&& url); + + /** Starts downloading and importing of queued archives. */ + void + next(); + + /** Returns indexes of queued archives. + @return indexes of queued archives. + */ + std::string + toString() const; + +private: + // The callback used by the downloader to notify completion of a download. + void + complete(boost::filesystem::path dstPath); + + // A job to extract an archive and import a shard. + void + process(boost::filesystem::path const& dstPath); + + void + remove(std::uint32_t shardIndex); + + Application& app_; + std::shared_ptr downloader_; + std::map archives_; + bool const validate_; + boost::filesystem::path const downloadDir_; + boost::asio::basic_waitable_timer timer_; + beast::Journal j_; +}; + +} // RPC +} // ripple + +#endif diff --git a/src/ripple/rpc/handlers/DownloadShard.cpp b/src/ripple/rpc/handlers/DownloadShard.cpp new file mode 100644 index 0000000000..883683260e --- /dev/null +++ b/src/ripple/rpc/handlers/DownloadShard.cpp @@ -0,0 +1,151 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012-2014 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace ripple { + +/** RPC command that downloads and import shard archives. + { + shards: [{index: , url: }] + validate: // optional, default is true + } + + example: + { + "command": "download_shard", + "shards": [ + {"index": 1, "url": "https://domain.com/1.tar.lz4"}, + {"index": 5, "url": "https://domain.com/5.tar.lz4"} + ] + } +*/ +Json::Value +doDownloadShard(RPC::Context& context) +{ + if (context.role != Role::ADMIN) + return rpcError(rpcNO_PERMISSION); + + // The shard store must be configured + auto shardStore {context.app.getShardStore()}; + if (!shardStore) + return rpcError(rpcNOT_ENABLED); + + // Deny request if already downloading + if (shardStore->getNumPreShard()) + return rpcError(rpcTOO_BUSY); + + if (!context.params.isMember(jss::shards)) + return RPC::missing_field_error(jss::shards); + if (!context.params[jss::shards].isArray() || + context.params[jss::shards].size() == 0) + { + return RPC::expected_field_error( + std::string(jss::shards), "an array"); + } + + // Validate shards + static const std::string ext {".tar.lz4"}; + std::map archives; + for (auto& it : context.params[jss::shards]) + { + // Validate the index + if (!it.isMember(jss::index)) + return RPC::missing_field_error(jss::index); + auto& jv {it[jss::index]}; + if (!(jv.isUInt() || (jv.isInt() && jv.asInt() >= 0))) + { + return RPC::expected_field_error( + std::string(jss::index), "an unsigned integer"); + } + + // Validate the URL + if (!it.isMember(jss::url)) + return RPC::missing_field_error(jss::url); + parsedURL url; + if (!parseUrl(url, it[jss::url].asString()) || + url.domain.empty() || url.path.empty()) + { + return RPC::invalid_field_error(jss::url); + } + if (url.scheme != "https") + return RPC::expected_field_error(std::string(jss::url), "HTTPS"); + + // URL must point to an lz4 compressed tar archive '.tar.lz4' + auto archiveName {url.path.substr(url.path.find_last_of("/\\") + 1)}; + if (archiveName.empty() || archiveName.size() <= ext.size()) + { + return RPC::make_param_error("Invalid field '" + + std::string(jss::url) + "', invalid archive name"); + } + if (!boost::iends_with(archiveName, ext)) + { + return RPC::make_param_error("Invalid field '" + + std::string(jss::url) + "', invalid archive extension"); + } + + // Check for duplicate indexes + if (!archives.emplace(jv.asUInt(), std::move(url)).second) + { + return RPC::make_param_error("Invalid field '" + + std::string(jss::index) + "', duplicate shard ids."); + } + } + + bool validate {true}; + if (context.params.isMember(jss::validate)) + { + if (!context.params[jss::validate].isBool()) + { + return RPC::expected_field_error( + std::string(jss::validate), "a bool"); + } + validate = context.params[jss::validate].asBool(); + } + + // Begin downloading. The handler keeps itself alive while downloading. + auto handler { + std::make_shared(context.app, validate)}; + if (!handler->init()) + return rpcError(rpcINTERNAL); + for (auto& ar : archives) + { + if (!handler->add(ar.first, std::move(ar.second))) + { + return RPC::make_param_error("Invalid field '" + + std::string(jss::index) + "', shard id " + + std::to_string(ar.first) + " exists or being acquired"); + } + } + handler->next(); + + return RPC::makeObjectValue("downloading shards " + handler->toString()); +} + +} // ripple diff --git a/src/ripple/rpc/handlers/Handlers.h b/src/ripple/rpc/handlers/Handlers.h index 06e2cbed59..b5f82bc654 100644 --- a/src/ripple/rpc/handlers/Handlers.h +++ b/src/ripple/rpc/handlers/Handlers.h @@ -41,6 +41,7 @@ Json::Value doChannelVerify (RPC::Context&); Json::Value doConnect (RPC::Context&); Json::Value doConsensusInfo (RPC::Context&); Json::Value doDepositAuthorized (RPC::Context&); +Json::Value doDownloadShard (RPC::Context&); Json::Value doFeature (RPC::Context&); Json::Value doFee (RPC::Context&); Json::Value doFetchInfo (RPC::Context&); diff --git a/src/ripple/rpc/impl/Handler.cpp b/src/ripple/rpc/impl/Handler.cpp index 1727359e7f..f6e38778e3 100644 --- a/src/ripple/rpc/impl/Handler.cpp +++ b/src/ripple/rpc/impl/Handler.cpp @@ -73,6 +73,7 @@ Handler const handlerArray[] { { "connect", byRef (&doConnect), Role::ADMIN, NO_CONDITION }, { "consensus_info", byRef (&doConsensusInfo), Role::ADMIN, NO_CONDITION }, { "deposit_authorized", byRef (&doDepositAuthorized), Role::USER, NO_CONDITION }, + { "download_shard", byRef (&doDownloadShard), Role::ADMIN, NO_CONDITION }, { "gateway_balances", byRef (&doGatewayBalances), Role::USER, NO_CONDITION }, { "get_counts", byRef (&doGetCounts), Role::ADMIN, NO_CONDITION }, { "feature", byRef (&doFeature), Role::ADMIN, NO_CONDITION }, diff --git a/src/ripple/rpc/impl/ShardArchiveHandler.cpp b/src/ripple/rpc/impl/ShardArchiveHandler.cpp new file mode 100644 index 0000000000..6bcf37ea29 --- /dev/null +++ b/src/ripple/rpc/impl/ShardArchiveHandler.cpp @@ -0,0 +1,265 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012-2014 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include + +#include + +namespace ripple { +namespace RPC { + +using namespace boost::filesystem; +using namespace std::chrono_literals; + +ShardArchiveHandler::ShardArchiveHandler(Application& app, bool validate) + : app_(app) + , validate_(validate) + , downloadDir_(get(app_.config().section( + ConfigSection::shardDatabase()), "path", "") + "/download") + , timer_(app_.getIOService()) + , j_(app.journal("ShardArchiveHandler")) +{ + assert(app_.getShardStore()); +} + +ShardArchiveHandler::~ShardArchiveHandler() +{ + timer_.cancel(); + for (auto const& ar : archives_) + app_.getShardStore()->removePreShard(ar.first); + + // Remove temp root download directory + try + { + remove_all(downloadDir_); + } + catch (std::exception const& e) + { + JLOG(j_.error()) << + "exception: " << e.what(); + } +} + +bool +ShardArchiveHandler::init() +{ + if (!app_.getShardStore()) + { + JLOG(j_.error()) << + "No shard store available"; + return false; + } + if (downloader_) + { + JLOG(j_.error()) << + "Already initialized"; + return false; + } + + try + { + // Remove if remnant from a crash + remove_all(downloadDir_); + + // Create temp root download directory + create_directory(downloadDir_); + } + catch (std::exception const& e) + { + JLOG(j_.error()) << + "exception: " << e.what(); + return false; + } + + downloader_ = std::make_shared( + app_.getIOService(), j_); + return downloader_->init(app_.config()); +} + +bool +ShardArchiveHandler::add(std::uint32_t shardIndex, parsedURL&& url) +{ + assert(downloader_); + auto const it {archives_.find(shardIndex)}; + if (it != archives_.end()) + return url == it->second; + if (!app_.getShardStore()->prepareShard(shardIndex)) + return false; + archives_.emplace(shardIndex, std::move(url)); + return true; +} + +void +ShardArchiveHandler::next() +{ + assert(downloader_); + + // Check if all archives completed + if (archives_.empty()) + return; + + // Create a temp archive directory at the root + auto const dstDir { + downloadDir_ / std::to_string(archives_.begin()->first)}; + try + { + create_directory(dstDir); + } + catch (std::exception const& e) + { + JLOG(j_.error()) << + "exception: " << e.what(); + remove(archives_.begin()->first); + return next(); + } + + // Download the archive + auto const& url {archives_.begin()->second}; + downloader_->download( + url.domain, + std::to_string(url.port.get_value_or(443)), + url.path, + 11, + dstDir / "archive.tar.lz4", + std::bind(&ShardArchiveHandler::complete, + shared_from_this(), std::placeholders::_1)); +} + +std::string +ShardArchiveHandler::toString() const +{ + assert(downloader_); + RangeSet rs; + for (auto const& ar : archives_) + rs.insert(ar.first); + return to_string(rs); +}; + +void +ShardArchiveHandler::complete(path dstPath) +{ + try + { + if (!is_regular_file(dstPath)) + { + auto ar {archives_.begin()}; + JLOG(j_.error()) << + "Downloading shard id " << ar->first << + " URL " << ar->second.domain << ar->second.path; + remove(ar->first); + return next(); + } + } + catch (std::exception const& e) + { + JLOG(j_.error()) << + "exception: " << e.what(); + remove(archives_.begin()->first); + return next(); + } + + // Process in another thread to not hold up the IO service + app_.getJobQueue().addJob( + jtCLIENT, "ShardArchiveHandler", + [=, dstPath = std::move(dstPath), ptr = shared_from_this()](Job&) + { + // If validating and not synced then defer and retry + auto const mode {ptr->app_.getOPs().getOperatingMode()}; + if (ptr->validate_ && mode != NetworkOPs::omFULL) + { + timer_.expires_from_now(static_cast( + (NetworkOPs::omFULL - mode) * 10)); + timer_.async_wait( + [=, dstPath = std::move(dstPath), ptr = std::move(ptr)] + (boost::system::error_code const& ec) + { + if (ec != boost::asio::error::operation_aborted) + ptr->complete(std::move(dstPath)); + }); + return; + } + ptr->process(dstPath); + ptr->next(); + }); +} + +void +ShardArchiveHandler::process(path const& dstPath) +{ + auto const shardIndex {archives_.begin()->first}; + auto const shardDir {dstPath.parent_path() / std::to_string(shardIndex)}; + try + { + // Decompress and extract the downloaded file + extractTarLz4(dstPath, dstPath.parent_path()); + + // The extracted root directory name must match the shard index + if (!is_directory(shardDir)) + { + JLOG(j_.error()) << + "Shard " << shardIndex << + " mismatches archive shard directory"; + return remove(shardIndex); + } + } + catch (std::exception const& e) + { + JLOG(j_.error()) << + "exception: " << e.what(); + return remove(shardIndex); + } + + // Import the shard into the shard store + if (!app_.getShardStore()->importShard(shardIndex, shardDir, validate_)) + { + JLOG(j_.error()) << + "Importing shard " << shardIndex; + } + else + { + JLOG(j_.debug()) << + "Shard " << shardIndex << " downloaded and imported"; + } + remove(shardIndex); +} + +void +ShardArchiveHandler::remove(std::uint32_t shardIndex) +{ + app_.getShardStore()->removePreShard(shardIndex); + archives_.erase(shardIndex); + + auto const dstDir {downloadDir_ / std::to_string(shardIndex)}; + try + { + remove_all(dstDir); + } + catch (std::exception const& e) + { + JLOG(j_.error()) << + "exception: " << e.what(); + } +} + +} // RPC +} // ripple diff --git a/src/ripple/unity/rpcx1.cpp b/src/ripple/unity/rpcx1.cpp index 4fb80d0afd..7541397a1b 100644 --- a/src/ripple/unity/rpcx1.cpp +++ b/src/ripple/unity/rpcx1.cpp @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include diff --git a/src/ripple/unity/rpcx2.cpp b/src/ripple/unity/rpcx2.cpp index 5c0f3e873c..37f82dbc54 100644 --- a/src/ripple/unity/rpcx2.cpp +++ b/src/ripple/unity/rpcx2.cpp @@ -57,6 +57,7 @@ #include #include #include +#include #include #include