diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 8d99d6c76..6382cfe59 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -513,7 +513,6 @@ target_sources (rippled PRIVATE src/ripple/nodestore/impl/EncodedBlob.cpp src/ripple/nodestore/impl/ManagerImp.cpp src/ripple/nodestore/impl/NodeObject.cpp - src/ripple/nodestore/impl/RetryFinalize.cpp src/ripple/nodestore/impl/Shard.cpp src/ripple/nodestore/impl/TaskQueue.cpp #[===============================[ @@ -623,6 +622,7 @@ target_sources (rippled PRIVATE src/ripple/rpc/impl/Role.cpp src/ripple/rpc/impl/ServerHandlerImp.cpp src/ripple/rpc/impl/ShardArchiveHandler.cpp + src/ripple/rpc/impl/ShardVerificationScheduler.cpp src/ripple/rpc/impl/Status.cpp src/ripple/rpc/impl/TransactionSign.cpp @@ -837,7 +837,7 @@ target_sources (rippled PRIVATE test sources: subdir: net #]===============================] - src/test/net/SSLHTTPDownloader_test.cpp + src/test/net/DatabaseDownloader_test.cpp #[===============================[ test sources: subdir: nodestore diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index e2f6de3bc..81c3c20de 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -349,6 +349,7 @@ public: detail::AppFamily family_; std::unique_ptr shardStore_; std::unique_ptr shardFamily_; + std::unique_ptr shardArchiveHandler_; // VFALCO TODO Make OrderBookDB abstract OrderBookDB m_orderBookDB; std::unique_ptr m_pathRequests; @@ -786,6 +787,64 @@ public: return shardStore_.get(); } + RPC::ShardArchiveHandler* + getShardArchiveHandler(bool tryRecovery) override + { + static std::mutex handlerMutex; + std::lock_guard lock(handlerMutex); + + // After constructing the handler, try to + // initialize it. Log on error; set the + // member variable on success. + auto initAndSet = + [this](std::unique_ptr&& handler) { + if (!handler) + return false; + + if (!handler->init()) + { + JLOG(m_journal.error()) + << "Failed to initialize ShardArchiveHandler."; + + return false; + } + + shardArchiveHandler_ = std::move(handler); + return true; + }; + + // Need to resume based on state from a previous + // run. + if (tryRecovery) + { + if (shardArchiveHandler_ != nullptr) + { + JLOG(m_journal.error()) + << "ShardArchiveHandler already created at startup."; + + return nullptr; + } + + auto handler = RPC::ShardArchiveHandler::tryMakeRecoveryHandler( + *this, *m_jobQueue); + + if (!initAndSet(std::move(handler))) + return nullptr; + } + + // Construct the ShardArchiveHandler + if (shardArchiveHandler_ == nullptr) + { + auto handler = RPC::ShardArchiveHandler::makeShardArchiveHandler( + *this, *m_jobQueue); + + if (!initAndSet(std::move(handler))) + return nullptr; + } + + return shardArchiveHandler_.get(); + } + Application::MutexType& getMasterMutex() override { @@ -1714,30 +1773,16 @@ ApplicationImp::setup() if (shardStore_) { - using namespace boost::filesystem; - - auto stateDb( - RPC::ShardArchiveHandler::getDownloadDirectory(*config_) / - stateDBName); - try { - if (exists(stateDb) && is_regular_file(stateDb) && - !RPC::ShardArchiveHandler::hasInstance()) + // Create a ShardArchiveHandler if recovery + // is needed (there's a state database left + // over from a previous run). + auto handler = getShardArchiveHandler(true); + + // Recovery is needed. + if (handler) { - auto handler = RPC::ShardArchiveHandler::recoverInstance( - *this, *m_jobQueue); - - assert(handler); - - if (!handler->initFromDB()) - { - JLOG(m_journal.fatal()) - << "Failed to initialize ShardArchiveHandler."; - - return false; - } - if (!handler->start()) { JLOG(m_journal.fatal()) diff --git a/src/ripple/app/main/Application.h b/src/ripple/app/main/Application.h index 6b7fc7d74..71746cdbd 100644 --- a/src/ripple/app/main/Application.h +++ b/src/ripple/app/main/Application.h @@ -46,6 +46,9 @@ class DatabaseShard; namespace perf { class PerfLog; } +namespace RPC { +class ShardArchiveHandler; +} // VFALCO TODO Fix forward declares required for header dependency loops class AmendmentTable; @@ -185,6 +188,8 @@ public: getNodeStore() = 0; virtual NodeStore::DatabaseShard* getShardStore() = 0; + virtual RPC::ShardArchiveHandler* + getShardArchiveHandler(bool tryRecovery = false) = 0; virtual InboundLedgers& getInboundLedgers() = 0; virtual InboundTransactions& diff --git a/src/ripple/net/DatabaseBody.h b/src/ripple/net/DatabaseBody.h index c616aeb9b..c12d8de90 100644 --- a/src/ripple/net/DatabaseBody.h +++ b/src/ripple/net/DatabaseBody.h @@ -28,6 +28,10 @@ namespace ripple { +// DatabaseBody needs to meet requirements +// from asio which is why some conventions +// used elsewhere in this code base are not +// followed. struct DatabaseBody { // Algorithm for storing buffers when parsing. @@ -53,15 +57,15 @@ class DatabaseBody::value_type friend struct DatabaseBody; // The cached file size - std::uint64_t file_size_ = 0; + std::uint64_t fileSize_ = 0; boost::filesystem::path path_; std::unique_ptr conn_; std::string batch_; std::shared_ptr strand_; std::mutex m_; std::condition_variable c_; - uint64_t handler_count_ = 0; - uint64_t part_ = 0; + std::uint64_t handlerCount_ = 0; + std::uint64_t part_ = 0; bool closing_ = false; public: @@ -75,14 +79,14 @@ public: bool is_open() const { - return bool{conn_}; + return static_cast(conn_); } /// Returns the size of the file if open std::uint64_t size() const { - return file_size_; + return fileSize_; } /// Close the file if open @@ -93,7 +97,9 @@ public: @param path The utf-8 encoded path to the file - @param mode The file mode to use + @param config The configuration settings + + @param io_service The asio context for running a strand. @param ec Set to the error, if any occurred */ @@ -114,9 +120,9 @@ class DatabaseBody::reader { value_type& body_; // The body we are writing to - static const uint32_t FLUSH_SIZE = 50000000; - static const uint8_t MAX_HANDLERS = 3; - static const uint16_t MAX_ROW_SIZE_PAD = 500; + static constexpr std::uint32_t FLUSH_SIZE = 50000000; + static constexpr std::uint8_t MAX_HANDLERS = 3; + static constexpr std::uint16_t MAX_ROW_SIZE_PAD = 500; public: // Constructor. diff --git a/src/ripple/net/DatabaseDownloader.h b/src/ripple/net/DatabaseDownloader.h index ec86242c3..e55e9a3a9 100644 --- a/src/ripple/net/DatabaseDownloader.h +++ b/src/ripple/net/DatabaseDownloader.h @@ -34,7 +34,8 @@ public: Config const& config); private: - static const uint8_t MAX_PATH_LEN = std::numeric_limits::max(); + static const std::uint8_t MAX_PATH_LEN = + std::numeric_limits::max(); std::shared_ptr getParser( @@ -48,7 +49,7 @@ private: void closeBody(std::shared_ptr p) override; - uint64_t + std::uint64_t size(std::shared_ptr p) override; Config const& config_; diff --git a/src/ripple/net/SSLHTTPDownloader.h b/src/ripple/net/SSLHTTPDownloader.h index f8499d049..624eae25f 100644 --- a/src/ripple/net/SSLHTTPDownloader.h +++ b/src/ripple/net/SSLHTTPDownloader.h @@ -41,7 +41,7 @@ namespace ripple { /** Provides an asynchronous HTTPS file downloader */ -class SSLHTTPDownloader : public std::enable_shared_from_this +class SSLHTTPDownloader { public: using error_code = boost::system::error_code; @@ -49,8 +49,7 @@ public: SSLHTTPDownloader( boost::asio::io_service& io_service, beast::Journal j, - Config const& config, - bool isPaused = false); + Config const& config); bool download( @@ -71,13 +70,13 @@ protected: beast::Journal const j_; - bool + void fail( boost::filesystem::path dstPath, std::function const& complete, boost::system::error_code const& ec, std::string const& errMsg, - std::shared_ptr parser = nullptr); + std::shared_ptr parser); private: HTTPClientSSLContext ssl_ctx_; @@ -85,9 +84,11 @@ private: boost::optional> stream_; boost::beast::flat_buffer read_buf_; - std::atomic isStopped_; - bool sessionActive_; + std::atomic cancelDownloads_; + + // Used to protect sessionActive_ std::mutex m_; + bool sessionActive_; std::condition_variable c_; void diff --git a/src/ripple/net/ShardDownloader.md b/src/ripple/net/ShardDownloader.md index 9ef643a96..a0617f9f0 100644 --- a/src/ripple/net/ShardDownloader.md +++ b/src/ripple/net/ShardDownloader.md @@ -1,17 +1,18 @@ -# Shard Downloader Process +# Shard Downloader ## Overview This document describes mechanics of the `SSLHTTPDownloader`, a class that performs the task of downloading shards from remote web servers via SSL HTTP. The downloader utilizes a strand (`boost::asio::io_service::strand`) to ensure that downloads are never executed concurrently. Hence, if a download is in progress when another download is initiated, the second download will be queued and invoked only when the first download is completed. -## New Features +## Motivation -The downloader has been recently (March 2020) been modified to provide some key features: +In March 2020 the downloader was modified to include some key features: - The ability to stop downloads during a graceful shutdown. - The ability to resume partial downloads after a crash or shutdown. -- *(Deferred) The ability to download from multiple servers to a single file.* + +This document was created to document the changes introduced by this change. ## Classes @@ -42,17 +43,15 @@ Much of the shard downloading process concerns the following classes: start(); ``` - When a client submits a `download_shard` command via the RPC interface, each of the requested files is registered with the handler via the `add` method. After all the files have been registered, the handler's `start` method is invoked, which in turn creates an instance of the `SSLHTTPDownloader` and begins the first download. When the download is completed, the downloader invokes the handler's `complete` method, which will initiate the download of the next file, or simply return if there are no more downloads to process. When `complete` is invoked with no remaining files to be downloaded, the handler and downloader are not destroyed automatically, but persist for the duration of the application. - -Additionally, we leverage a novel class to provide customized parsing for downloaded files: + When a client submits a `download_shard` command via the RPC interface, each of the requested files is registered with the handler via the `add` method. After all the files have been registered, the handler's `start` method is invoked, which in turn creates an instance of the `SSLHTTPDownloader` and begins the first download. When the download is completed, the downloader invokes the handler's `complete` method, which will initiate the download of the next file, or simply return if there are no more downloads to process. When `complete` is invoked with no remaining files to be downloaded, the handler and downloader are not destroyed automatically, but persist for the duration of the application to assist with graceful shutdowns by `Stoppable`. - `DatabaseBody` - This class will define a custom message body type, allowing an `http::response_parser` to write to a SQLite database rather than to a flat file. This class is discussed in further detail in the Recovery section. + This class defines a custom message body type, allowing an `http::response_parser` to write to an SQLite database rather than to a flat file. This class is discussed in further detail in the Recovery section. -## Execution Concept +## Graceful Shutdowns & Recovery -This section describes in greater detail how the key features of the downloader are implemented in C++ using the `boost::asio` framework. +This section describes in greater detail how the shutdown and recovery features of the downloader are implemented in C++ using the `boost::asio` framework. ##### Member Variables: @@ -65,7 +64,7 @@ using boost::asio::ip::tcp::socket; stream stream_; std::condition_variable c_; -std::atomic isStopped_; +std::atomic cancelDownloads_; ``` ### Graceful Shutdowns @@ -90,7 +89,7 @@ ShardArchiveHandler::onStop() } ``` -Inside of `SSLHTTPDownloader::onStop()`, if a download is currently in progress, the `isStopped_` member variable is set and the thread waits for the download to stop: +Inside of `SSLHTTPDownloader::onStop()`, if a download is currently in progress, the `cancelDownloads_` member variable is set and the thread waits for the download to stop: ```c++ void @@ -98,7 +97,7 @@ SSLHTTPDownloader::onStop() { std::unique_lock lock(m_); - isStopped_ = true; + cancelDownloads_ = true; if(sessionActive_) { @@ -114,28 +113,23 @@ SSLHTTPDownloader::onStop() ##### Thread 2: -The graceful shutdown is realized when the thread executing the download polls `isStopped_` after this variable has been set to `true`. Polling only occurs while the file is being downloaded, in between calls to `async_read_some()`. The stop takes effect when the socket is closed and the handler function ( `do_session()` ) is exited. +The graceful shutdown is realized when the thread executing the download polls `cancelDownloads_` after this variable has been set to `true`. Polling occurs while the file is being downloaded, in between calls to `async_read_some()`. The stop takes effect when the socket is closed and the handler function ( `do_session()` ) is exited. ```c++ void SSLHTTPDownloader::do_session() { - // (Connection initialization logic) + // (Connection initialization logic) . . . - . - . - . // (In between calls to async_read_some): - if(isStopped_.load()) + if(cancelDownloads_.load()) { close(p); return exit(); } - . - . - . + // . . . break; } @@ -143,11 +137,11 @@ void SSLHTTPDownloader::do_session() ### Recovery -Persisting the current state of both the archive handler and the downloader is achieved by leveraging a SQLite database rather than flat files, as the database protects against data corruption that could result from a system crash. +Persisting the current state of both the archive handler and the downloader is achieved by leveraging an SQLite database rather than flat files, as the database protects against data corruption that could result from a system crash. ##### ShardArchiveHandler -Although `SSLHTTPDownloader` is a generic class that could be used to download a variety of file types, currently it is used exclusively by the `ShardArchiveHandler` to download shards. In order to provide resilience, the `ShardArchiveHandler` will utilize a SQLite database to preserve its current state whenever there are active, paused, or queued downloads. The `shard_db` section in the configuration file allows users to specify the location of the database to use for this purpose. +Although `SSLHTTPDownloader` is a generic class that could be used to download a variety of file types, currently it is used exclusively by the `ShardArchiveHandler` to download shards. In order to provide resilience, the `ShardArchiveHandler` will use an SQLite database to preserve its current state whenever there are active, paused, or queued downloads. The `shard_db` section in the configuration file allows users to specify the location of the database to use for this purpose. ###### SQLite Table Format @@ -159,11 +153,11 @@ Although `SSLHTTPDownloader` is a generic class that could be used to download a ##### SSLHTTPDownloader -While the archive handler maintains a list of all partial and queued downloads, the `SSLHTTPDownloader` stores the raw bytes of the file currently being downloaded. The partially downloaded file will be represented as one or more `BLOB` entries in a SQLite database. As the maximum size of a `BLOB` entry is currently limited to roughly 2.1 GB, a 5 GB shard file for instance will occupy three database entries upon completion. +While the archive handler maintains a list of all partial and queued downloads, the `SSLHTTPDownloader` stores the raw bytes of the file currently being downloaded. The partially downloaded file will be represented as one or more `BLOB` entries in an SQLite database. As the maximum size of a `BLOB` entry is currently limited to roughly 2.1 GB, a 5 GB shard file for instance will occupy three database entries upon completion. ###### SQLite Table Format -Since downloads execute serially by design, the entries in this table always correspond to the content of a single file. +Since downloads execute serially by design, the entries in this table always correspond to the contents of a single file. | Bytes | Size | Part | |:------:|:----------:|:----:| @@ -172,7 +166,7 @@ Since downloads execute serially by design, the entries in this table always cor | 0x... | 705032706 | 2 | ##### Config File Entry -The `download_path` field of the `shard_db` entry will be used to determine where to store the recovery database. If this field is omitted, the `path` field will be used instead. +The `download_path` field of the `shard_db` entry is used to determine where to store the recovery database. If this field is omitted, the `path` field will be used instead. ```dosini # This is the persistent datastore for shards. It is important for the health @@ -187,7 +181,7 @@ max_size_gb=50 ``` ##### Resuming Partial Downloads -When resuming downloads after a crash or other interruption, the `SSLHTTPDownloader` will utilize the `range` field of the HTTP header to download only the remainder of the partially downloaded file. +When resuming downloads after a shutdown, crash, or other interruption, the `SSLHTTPDownloader` will utilize the `range` field of the HTTP header to download only the remainder of the partially downloaded file. ```C++ auto downloaded = getPartialFileSize(); @@ -199,14 +193,14 @@ http::request req {http::verb::head, if (downloaded < total) { - // If we already download 1000 bytes to the partial file, + // If we already downloaded 1000 bytes to the database, // the range header will look like: // Range: "bytes=1000-" req.set(http::field::range, "bytes=" + to_string(downloaded) + "-"); } else if(downloaded == total) { - // Download is already complete. (Interruption Must + // Download is already complete. (Interruption must // have occurred after file was downloaded but before // the state file was updated.) } @@ -242,6 +236,7 @@ struct body } ``` +Note that the `DatabaseBody` class is specifically designed to work with `asio` and follows `asio` conventions. The method invoked to write data to the filesystem (or SQLite database in our case) has the following signature: @@ -252,7 +247,7 @@ body::reader::put(ConstBufferSequence const& buffers, error_code& ec); ## Sequence Diagram -This sequence diagram demonstrates a scenario wherein the `ShardArchiveHandler` leverages the state persisted in the database to recover from a crash and resume the scheduled downloads. +This sequence diagram demonstrates a scenario wherein the `ShardArchiveHandler` leverages the state persisted in the database to recover from a crash and resume the requested downloads. ![alt_text](./images/interrupt_sequence.png "Resuming downloads post abort") diff --git a/src/ripple/net/impl/DatabaseBody.ipp b/src/ripple/net/impl/DatabaseBody.ipp index d5b854f4b..d6bae7b47 100644 --- a/src/ripple/net/impl/DatabaseBody.ipp +++ b/src/ripple/net/impl/DatabaseBody.ipp @@ -27,11 +27,11 @@ DatabaseBody::value_type::close() // Stop all scheduled and currently // executing handlers before closing. - if (handler_count_) + if (handlerCount_) { closing_ = true; - auto predicate = [&] { return !handler_count_; }; + auto predicate = [&] { return !handlerCount_; }; c_.wait(lock, predicate); } @@ -76,12 +76,12 @@ DatabaseBody::value_type::open( // Continuing a file download. else { - boost::optional size; + boost::optional size; *db << "SELECT SUM(LENGTH(Data)) FROM Download;", soci::into(size); if (size) - file_size_ = size.get(); + fileSize_ = size.get(); } } } @@ -155,10 +155,10 @@ DatabaseBody::reader::put( { std::lock_guard lock(body_.m_); - if (body_.handler_count_ >= MAX_HANDLERS) + if (body_.handlerCount_ >= MAX_HANDLERS) post = false; else - ++body_.handler_count_; + ++body_.handlerCount_; } if (post) @@ -191,7 +191,7 @@ DatabaseBody::reader::do_put(std::string data) // The download is being halted. if (body_.closing_) { - if (--body_.handler_count_ == 0) + if (--body_.handlerCount_ == 0) { lock.unlock(); body_.c_.notify_one(); @@ -202,10 +202,10 @@ DatabaseBody::reader::do_put(std::string data) } auto path = body_.path_.string(); - uint64_t rowSize; + std::uint64_t rowSize = 0; soci::indicator rti; - uint64_t remainingInRow; + std::uint64_t remainingInRow = 0; auto db = body_.conn_->checkoutDb(); @@ -236,9 +236,9 @@ DatabaseBody::reader::do_put(std::string data) else remainingInRow = blobMaxSize - rowSize; - auto insert = [&db, &rowSize, &part = body_.part_, &fs = body_.file_size_]( + auto insert = [&db, &rowSize, &part = body_.part_, &fs = body_.fileSize_]( auto const& data) { - uint64_t updatedSize = rowSize + data.size(); + std::uint64_t updatedSize = rowSize + data.size(); *db << "UPDATE Download SET Data = CAST(Data || :data AS blob), " "Size = :size WHERE Part = :part;", @@ -263,7 +263,7 @@ DatabaseBody::reader::do_put(std::string data) bool const notify = [this] { std::lock_guard lock(body_.m_); - return --body_.handler_count_ == 0; + return --body_.handlerCount_ == 0; }(); if (notify) @@ -279,9 +279,9 @@ DatabaseBody::reader::finish(boost::system::error_code& ec) // Wait for scheduled DB writes // to complete. - if (body_.handler_count_) + if (body_.handlerCount_) { - auto predicate = [&] { return !body_.handler_count_; }; + auto predicate = [&] { return !body_.handlerCount_; }; body_.c_.wait(lock, predicate); } } diff --git a/src/ripple/net/impl/DatabaseDownloader.cpp b/src/ripple/net/impl/DatabaseDownloader.cpp index f96e0133c..e92365555 100644 --- a/src/ripple/net/impl/DatabaseDownloader.cpp +++ b/src/ripple/net/impl/DatabaseDownloader.cpp @@ -45,7 +45,7 @@ DatabaseDownloader::getParser( if (ec) { p->get().body().close(); - fail(dstPath, complete, ec, "open"); + fail(dstPath, complete, ec, "open", nullptr); } return p; @@ -69,7 +69,7 @@ DatabaseDownloader::closeBody(std::shared_ptr p) databaseBodyParser->get().body().close(); } -uint64_t +std::uint64_t DatabaseDownloader::size(std::shared_ptr p) { using namespace boost::beast; diff --git a/src/ripple/net/impl/SSLHTTPDownloader.cpp b/src/ripple/net/impl/SSLHTTPDownloader.cpp index 6988a1e93..26919331a 100644 --- a/src/ripple/net/impl/SSLHTTPDownloader.cpp +++ b/src/ripple/net/impl/SSLHTTPDownloader.cpp @@ -25,12 +25,11 @@ namespace ripple { SSLHTTPDownloader::SSLHTTPDownloader( boost::asio::io_service& io_service, beast::Journal j, - Config const& config, - bool isPaused) + Config const& config) : j_(j) , ssl_ctx_(config, j, boost::asio::ssl::context::tlsv12_client) , strand_(io_service) - , isStopped_(false) + , cancelDownloads_(false) , sessionActive_(false) { } @@ -47,10 +46,19 @@ SSLHTTPDownloader::download( if (!checkPath(dstPath)) return false; + { + std::lock_guard lock(m_); + + if (cancelDownloads_) + return true; + + sessionActive_ = true; + } + if (!strand_.running_in_this_thread()) strand_.post(std::bind( &SSLHTTPDownloader::download, - this->shared_from_this(), + this, host, port, target, @@ -62,7 +70,7 @@ SSLHTTPDownloader::download( strand_, std::bind( &SSLHTTPDownloader::do_session, - this->shared_from_this(), + this, host, port, target, @@ -78,7 +86,7 @@ SSLHTTPDownloader::onStop() { std::unique_lock lock(m_); - isStopped_ = true; + cancelDownloads_ = true; if (sessionActive_) { @@ -106,159 +114,6 @@ SSLHTTPDownloader::do_session( ////////////////////////////////////////////// // Define lambdas for encapsulating download // operations: - auto connect = [&](std::shared_ptr parser) { - uint64_t const rangeStart = size(parser); - - ip::tcp::resolver resolver{strand_.context()}; - auto const results = resolver.async_resolve(host, port, yield[ec]); - if (ec) - return fail(dstPath, complete, ec, "async_resolve", parser); - - try - { - stream_.emplace(strand_.context(), ssl_ctx_.context()); - } - catch (std::exception const& e) - { - return fail( - dstPath, - complete, - ec, - std::string("exception: ") + e.what(), - parser); - } - - ec = ssl_ctx_.preConnectVerify(*stream_, host); - if (ec) - return fail(dstPath, complete, ec, "preConnectVerify", parser); - - boost::asio::async_connect( - stream_->next_layer(), results.begin(), results.end(), yield[ec]); - if (ec) - return fail(dstPath, complete, ec, "async_connect", parser); - - ec = ssl_ctx_.postConnectVerify(*stream_, host); - if (ec) - return fail(dstPath, complete, ec, "postConnectVerify", parser); - - stream_->async_handshake(ssl::stream_base::client, yield[ec]); - if (ec) - return fail(dstPath, complete, ec, "async_handshake", parser); - - // Set up an HTTP HEAD request message to find the file size - http::request req{http::verb::head, target, version}; - req.set(http::field::host, host); - req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING); - - // Requesting a portion of the file - if (rangeStart) - { - req.set( - http::field::range, - (boost::format("bytes=%llu-") % rangeStart).str()); - } - - http::async_write(*stream_, req, yield[ec]); - if (ec) - return fail(dstPath, complete, ec, "async_write", parser); - - { - // Check if available storage for file size - http::response_parser p; - p.skip(true); - http::async_read(*stream_, read_buf_, p, yield[ec]); - if (ec) - return fail(dstPath, complete, ec, "async_read", parser); - - // Range request was rejected - if (p.get().result() == http::status::range_not_satisfiable) - { - req.erase(http::field::range); - - http::async_write(*stream_, req, yield[ec]); - if (ec) - return fail( - dstPath, - complete, - ec, - "async_write_range_verify", - parser); - - http::response_parser p; - p.skip(true); - - http::async_read(*stream_, read_buf_, p, yield[ec]); - if (ec) - return fail( - dstPath, - complete, - ec, - "async_read_range_verify", - parser); - - // The entire file is downloaded already. - if (p.content_length() == rangeStart) - skip = true; - else - return fail( - dstPath, complete, ec, "range_not_satisfiable", parser); - } - else if ( - rangeStart && p.get().result() != http::status::partial_content) - { - ec.assign( - boost::system::errc::not_supported, - boost::system::generic_category()); - - return fail( - dstPath, complete, ec, "Range request ignored", parser); - } - else if (auto len = p.content_length()) - { - try - { - if (*len > space(dstPath.parent_path()).available) - { - return fail( - dstPath, - complete, - ec, - "Insufficient disk space for download", - parser); - } - } - catch (std::exception const& e) - { - return fail( - dstPath, - complete, - ec, - std::string("exception: ") + e.what(), - parser); - } - } - } - - if (!skip) - { - // Set up an HTTP GET request message to download the file - req.method(http::verb::get); - - if (rangeStart) - { - req.set( - http::field::range, - (boost::format("bytes=%llu-") % rangeStart).str()); - } - } - - http::async_write(*stream_, req, yield[ec]); - if (ec) - return fail(dstPath, complete, ec, "async_write", parser); - - return true; - }; - auto close = [&](auto p) { closeBody(p); @@ -276,14 +131,6 @@ SSLHTTPDownloader::do_session( stream_ = boost::none; }; - auto getParser = [&] { - auto p = this->getParser(dstPath, complete, ec); - if (ec) - fail(dstPath, complete, ec, "getParser", p); - - return p; - }; - // When the downloader is being stopped // because the server is shutting down, // this method notifies a 'Stoppable' @@ -294,23 +141,151 @@ SSLHTTPDownloader::do_session( c_.notify_one(); }; + auto failAndExit = [&exit, &dstPath, complete, &ec, this]( + std::string const& errMsg, auto p) { + exit(); + fail(dstPath, complete, ec, errMsg, p); + }; // end lambdas //////////////////////////////////////////////////////////// + if (cancelDownloads_.load()) + return exit(); + + auto p = this->getParser(dstPath, complete, ec); + if (ec) + return failAndExit("getParser", p); + + ////////////////////////////////////////////// + // Prepare for download and establish the + // connection: + std::uint64_t const rangeStart = size(p); + + ip::tcp::resolver resolver{strand_.context()}; + auto const results = resolver.async_resolve(host, port, yield[ec]); + if (ec) + return failAndExit("async_resolve", p); + + try { - std::lock_guard lock(m_); - sessionActive_ = true; + stream_.emplace(strand_.context(), ssl_ctx_.context()); + } + catch (std::exception const& e) + { + return failAndExit(std::string("exception: ") + e.what(), p); } - if (isStopped_.load()) - return exit(); - - auto p = getParser(); + ec = ssl_ctx_.preConnectVerify(*stream_, host); if (ec) - return exit(); + return failAndExit("preConnectVerify", p); - if (!connect(p) || ec) - return exit(); + boost::asio::async_connect( + stream_->next_layer(), results.begin(), results.end(), yield[ec]); + if (ec) + return failAndExit("async_connect", p); + + ec = ssl_ctx_.postConnectVerify(*stream_, host); + if (ec) + return failAndExit("postConnectVerify", p); + + stream_->async_handshake(ssl::stream_base::client, yield[ec]); + if (ec) + return failAndExit("async_handshake", p); + + // Set up an HTTP HEAD request message to find the file size + http::request req{http::verb::head, target, version}; + req.set(http::field::host, host); + req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING); + + // Requesting a portion of the file + if (rangeStart) + { + req.set( + http::field::range, + (boost::format("bytes=%llu-") % rangeStart).str()); + } + + http::async_write(*stream_, req, yield[ec]); + if (ec) + return failAndExit("async_write", p); + + { + // Read the response + http::response_parser connectParser; + connectParser.skip(true); + http::async_read(*stream_, read_buf_, connectParser, yield[ec]); + if (ec) + return failAndExit("async_read", p); + + // Range request was rejected + if (connectParser.get().result() == http::status::range_not_satisfiable) + { + req.erase(http::field::range); + + http::async_write(*stream_, req, yield[ec]); + if (ec) + return failAndExit("async_write_range_verify", p); + + http::response_parser rangeParser; + rangeParser.skip(true); + + http::async_read(*stream_, read_buf_, rangeParser, yield[ec]); + if (ec) + return failAndExit("async_read_range_verify", p); + + // The entire file is downloaded already. + if (rangeParser.content_length() == rangeStart) + skip = true; + else + return failAndExit("range_not_satisfiable", p); + } + else if ( + rangeStart && + connectParser.get().result() != http::status::partial_content) + { + ec.assign( + boost::system::errc::not_supported, + boost::system::generic_category()); + + return failAndExit("Range request ignored", p); + } + else if (auto len = connectParser.content_length()) + { + try + { + // Ensure sufficient space is available + if (*len > space(dstPath.parent_path()).available) + { + return failAndExit( + "Insufficient disk space for download", p); + } + } + catch (std::exception const& e) + { + return failAndExit(std::string("exception: ") + e.what(), p); + } + } + } + + if (!skip) + { + // Set up an HTTP GET request message to download the file + req.method(http::verb::get); + + if (rangeStart) + { + req.set( + http::field::range, + (boost::format("bytes=%llu-") % rangeStart).str()); + } + } + + http::async_write(*stream_, req, yield[ec]); + if (ec) + return failAndExit("async_write", p); + + // end prepare and connect + //////////////////////////////////////////////////////////// if (skip) p->skip(true); @@ -318,7 +293,7 @@ SSLHTTPDownloader::do_session( // Download the file while (!p->is_done()) { - if (isStopped_.load()) + if (cancelDownloads_.load()) { close(p); return exit(); @@ -336,7 +311,7 @@ SSLHTTPDownloader::do_session( complete(std::move(dstPath)); } -bool +void SSLHTTPDownloader::fail( boost::filesystem::path dstPath, std::function const& complete, @@ -366,8 +341,6 @@ SSLHTTPDownloader::fail( << " in function: " << __func__; } complete(std::move(dstPath)); - - return false; } } // namespace ripple diff --git a/src/ripple/nodestore/RetryFinalize.h b/src/ripple/nodestore/RetryFinalize.h deleted file mode 100644 index c115e2102..000000000 --- a/src/ripple/nodestore/RetryFinalize.h +++ /dev/null @@ -1,61 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2020 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_NODESTORE_RETRYFINALIZE_H_INCLUDED -#define RIPPLE_NODESTORE_RETRYFINALIZE_H_INCLUDED - -#include -#include - -namespace ripple { -namespace NodeStore { - -class RetryFinalize -{ -public: - using retryFunction = std::function; - - RetryFinalize() = default; - - bool - retry(Application& app, retryFunction f, std::uint32_t shardIndex); - - // Must match the imported shard's last ledger hash - uint256 referenceHash{0}; - -private: - using waitable_timer = - boost::asio::basic_waitable_timer; - - static constexpr std::chrono::seconds retryInterval_ = - std::chrono::seconds{60}; - - // Maximum attempts to retrieve a shard's last ledger hash - static constexpr uint32_t maxAttempts_{5}; - - std::unique_ptr timer_; - - // Number of attempts to retrieve a shard's last ledger hash - std::uint32_t numAttempts_{0}; -}; - -} // namespace NodeStore -} // namespace ripple - -#endif // RIPPLE_NODESTORE_RETRYFINALIZE_H_INCLUDED diff --git a/src/ripple/nodestore/ShardValidation.md b/src/ripple/nodestore/ShardValidation.md index ec66f0545..229a46c88 100644 --- a/src/ripple/nodestore/ShardValidation.md +++ b/src/ripple/nodestore/ShardValidation.md @@ -2,129 +2,32 @@ ## Overview -In order to validate shards that have been downloaded from file servers (as opposed to shards acquired from peers), the application must confirm the validity of the downloaded shard's last ledger. The following sections describe this confirmation process in greater detail. +In order to validate shards that have been downloaded from file servers (as opposed to shards acquired from peers), the application must confirm the validity of the downloaded shard's last ledger. So before initiating the download, we first confirm that we are able to retrieve the shard's last ledger hash. The following sections describe this confirmation process in greater detail. -## Execution Concept +## Hash Verification ### Flag Ledger -Since the number of ledgers contained in each shard is always a multiple of 256, a shard's last ledger is always a flag ledger. Conveniently, the application provides a mechanism for retrieving the hash for a given flag ledger: +Since the number of ledgers contained in each shard is always a multiple of 256, a shard's last ledger is always a flag ledger. Conveniently, the skip list stored within a ledger will provide us with a series of flag ledger hashes, enabling the software to corroborate a shard's last ledger hash. We access the skip list by calling `LedgerMaster::walkHashBySeq` and providing the sequence of a shard's last ledger: ```C++ -boost::optional -hashOfSeq (ReadView const& ledger, - LedgerIndex seq, - beast::Journal journal) +boost::optional expectedHash; +expectedHash = + app_.getLedgerMaster().walkHashBySeq(lastLedgerSeq(shardIndex)); ``` -When validating downloaded shards, we use this function to retrieve the hash of the shard's last ledger. If the function returns a hash that matches the hash stored in the shard, validation of the shard can proceed. +When a user requests a shard download, the `ShardArchiveHandler` will first use this function to retrieve the hash of the shard's last ledger. If the function returns a hash, downloading the shard can proceed. Once the download completes, the server can reliably retrieve this last ledger hash to complete validation of the shard. ### Caveats #### Later Ledger -The `getHashBySeq` function will provide the hash of a flag ledger only if the application has stored a later ledger. When validating a downloaded shard, if there is no later ledger stored, validation of the shard will be deferred until a later ledger has been stored. +The `walkHashBySeq` function will provide the hash of a flag ledger only if the application has stored a later ledger. When verifying the last ledger hash of a pending shard download, if there is no later ledger stored, the download will be deferred until a later ledger has been stored. -We employ a simple heuristic for determining whether the application has stored a ledger later than the last ledger of the downloaded shard: - -```C++ -// We use the presence (or absense) of the validated -// ledger as a heuristic for determining whether or -// not we have stored a ledger that comes after the -// last ledger in this shard. A later ledger must -// be present in order to reliably retrieve the hash -// of the shard's last ledger. -if (app_.getLedgerMaster().getValidatedLedger()) -{ - auto const hash = app_.getLedgerMaster().getHashBySeq( - lastLedgerSeq(shardIndex)); - - . - . - . -} -``` - -The `getHashBySeq` function will be invoked only when a call to `LedgerMaster::getValidatedLedger` returns a validated ledger, rather than a `nullptr`. Otherwise validation of the shard will be deferred. +We use the presence (or absence) of a validated ledger with a sequence number later than the sequence of the shard's last ledger as a heuristic for determining whether or not we should have the shard's last ledger hash. A later ledger must be present in order to reliably retrieve the hash of the shard's last ledger. The hash will only be retrieved when a later ledger is present. Otherwise verification of the shard will be deferred. ### Retries #### Retry Limit -If the server must defer shard validation, the software will initiate a timer that upon expiration, will re-attempt confirming the last ledger hash. We place an upper limit on the number of attempts the server makes to achieve this confirmation. When the maximum number of attempts has been reached, validation of the shard will fail, resulting in the removal of the shard. An attempt counts toward the limit only when we are able to get a validated ledger (implying a current view of the network), but are unable to retrieve the last ledger hash. Retries that occur because no validated ledger was available are not counted. - -#### ShardInfo - -The `DatabaseShardImp` class stores a container of `ShardInfo` structs, each of which contains information pertaining to a shard held by the server. These structs will be used during shard import to store the last ledger hash (when available) and to track the number of hash confirmation attempts that have been made. - -```C++ -struct ShardInfo -{ - . - . - . - - // Used to limit the number of times we attempt - // to retrieve a shard's last ledger hash, when - // the hash should have been found. See - // scheduleFinalizeShard(). Once this limit has - // been exceeded, the shard has failed and is - // removed. - bool - attemptHashConfirmation() - { - if (lastLedgerHashAttempts + 1 <= maxLastLedgerHashAttempts) - { - ++lastLedgerHashAttempts; - return true; - } - - return false; - } - - // This variable is used during the validation - // of imported shards and must match the - // imported shard's last ledger hash. - uint256 lastLedgerHash; - - // The number of times we've attempted to - // confirm this shard's last ledger hash. - uint16_t lastLedgerHashAttempts; - - // The upper limit on attempts to confirm - // the shard's last ledger hash. - static const uint8_t maxLastLedgerHashAttempts = 5; -}; -``` - -### Shard Import - -Once a shard has been successfully downloaded by the `ShardArchiveHandler`, this class invokes the `importShard` method on the shard database: - -```C++ -bool -DatabaseShardImp::importShard( - std::uint32_t shardIndex, - boost::filesystem::path const& srcDir) -``` - -At the end of this method, `DatabaseShardImp::finalizeShard` is invoked which begins validation of the downloaded shard. This will be changed so that instead, the software first creates a task to confirm the last ledger hash. Upon the successful completion of this task, shard validation will begin. - -```C++ -bool -DatabaseShardImp::importShard( - std::uint32_t shardIndex, - boost::filesystem::path const& srcDir) -{ - . - . - . - - taskQueue_->addTask([this]() - { - // Verify hash. - // Invoke DatabaseShardImp::finalizeShard on success. - // Defer task if necessary. - }); -} -``` +If the server must defer hash verification, the software will initiate a timer that upon expiration, will re-attempt verifying the last ledger hash. We place an upper limit on the number of attempts the server makes to achieve this verification. When the maximum number of attempts has been reached, the download request will fail, and the `ShardArchiveHandler` will proceed with any remaining downloads. An attempt counts toward the limit only when we are able to get a later validated ledger (implying a current view of the network), but are unable to retrieve the last ledger hash. Retries that occur because no validated ledger was available are not counted. diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 5b559f394..8f5556499 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -156,7 +156,8 @@ DatabaseShardImp::init() auto const result{shards_.emplace( shardIndex, ShardInfo(std::move(shard), ShardInfo::State::none))}; - finalizeShard(result.first->second, true, lock); + finalizeShard( + result.first->second, true, lock, boost::none); } else { @@ -355,6 +356,16 @@ DatabaseShardImp::importShard( return false; } + auto expectedHash = + app_.getLedgerMaster().walkHashBySeq(lastLedgerSeq(shardIndex)); + + if (!expectedHash) + { + JLOG(j_.error()) << "shard " << shardIndex + << " expected hash not found"; + return false; + } + auto renameDir = [&](path const& src, path const& dst) { try { @@ -376,8 +387,7 @@ DatabaseShardImp::importShard( // Check shard is prepared if (auto const it{shards_.find(shardIndex)}; it == shards_.end() || - it->second.shard || it->second.state != ShardInfo::State::import || - it->second.retryFinalize) + it->second.shard || it->second.state != ShardInfo::State::import) { JLOG(j_.error()) << "shard " << shardIndex << " failed to import"; return false; @@ -404,8 +414,7 @@ DatabaseShardImp::importShard( std::lock_guard lock(mutex_); auto const it{shards_.find(shardIndex)}; if (it == shards_.end() || it->second.shard || - it->second.state != ShardInfo::State::import || - it->second.retryFinalize) + it->second.state != ShardInfo::State::import) { JLOG(j_.error()) << "shard " << shardIndex << " failed to import"; shard.reset(); @@ -414,10 +423,9 @@ DatabaseShardImp::importShard( } it->second.shard = std::move(shard); - it->second.retryFinalize = std::make_unique(); + finalizeShard(it->second, true, lock, expectedHash); } - finalizeWithRefHash(shardIndex); return true; } @@ -611,9 +619,6 @@ DatabaseShardImp::onStop() { if (e.second.shard) e.second.shard->stop(); - - if (e.second.retryFinalize) - e.second.retryFinalize.reset(); } shards_.clear(); } @@ -813,7 +818,8 @@ DatabaseShardImp::import(Database& source) auto const result{shards_.emplace( shardIndex, ShardInfo(std::move(shard), ShardInfo::State::none))}; - finalizeShard(result.first->second, true, lock); + finalizeShard( + result.first->second, true, lock, boost::none); } catch (std::exception const& e) { @@ -1185,94 +1191,12 @@ DatabaseShardImp::findAcquireIndex( return boost::none; } -void -DatabaseShardImp::finalizeWithRefHash(std::uint32_t shardIndex) -{ - // We use the presence (or absence) of the validated - // ledger as a heuristic for determining whether or - // not we have stored a ledger that comes after the - // last ledger in this shard. A later ledger must - // be present in order to reliably retrieve the hash - // of the shard's last ledger. - boost::optional referenceHash; - if (app_.getLedgerMaster().getValidatedLedger()) - { - referenceHash = - app_.getLedgerMaster().walkHashBySeq(lastLedgerSeq(shardIndex)); - } - - // Make sure the shard was found in an - // expected state. - auto confirmShard = [this, shardIndex]( - auto const it, std::lock_guard&) { - if (it == shards_.end() || - it->second.state != ShardInfo::State::import || - !it->second.retryFinalize) - { - JLOG(j_.error()) << "shard " << shardIndex << " failed to import"; - return false; - } - - return true; - }; - - // The node is shutting down; remove the shard - // and return. - if (isStopping()) - { - std::shared_ptr shard; - - { - std::lock_guard lock(mutex_); - auto const it{shards_.find(shardIndex)}; - - if(!confirmShard(it, lock)) - return; - - shard = it->second.shard; - } - - JLOG(j_.warn()) - << "shard " << shardIndex - << " will not be imported due to system shutdown, removing"; - - removeFailedShard(shard); - return; - } - - std::lock_guard lock(mutex_); - auto const it{shards_.find(shardIndex)}; - - if(!confirmShard(it, lock)) - return; - - if (referenceHash && referenceHash->isNonZero()) - { - it->second.retryFinalize->referenceHash = *referenceHash; - finalizeShard(it->second, true, lock); - return; - } - - // Failed to find a reference hash, schedule to try again - if (!it->second.retryFinalize->retry( - app_, - std::bind( - &DatabaseShardImp::finalizeWithRefHash, - this, - std::placeholders::_1), - shardIndex)) - { - JLOG(j_.error()) << "shard " << shardIndex - << " failed to import, maximum attempts reached"; - removeFailedShard(it->second.shard); - } -} - void DatabaseShardImp::finalizeShard( ShardInfo& shardInfo, bool writeSQLite, - std::lock_guard&) + std::lock_guard&, + boost::optional const& expectedHash) { assert(shardInfo.shard); assert(shardInfo.shard->index() != acquireIndex_); @@ -1282,19 +1206,16 @@ DatabaseShardImp::finalizeShard( auto const shardIndex{shardInfo.shard->index()}; shardInfo.state = ShardInfo::State::finalize; - taskQueue_->addTask([this, shardIndex, writeSQLite]() { + taskQueue_->addTask([this, shardIndex, writeSQLite, expectedHash]() { if (isStopping()) return; std::shared_ptr shard; - boost::optional referenceHash; { std::lock_guard lock(mutex_); - if (auto const it {shards_.find(shardIndex)}; it != shards_.end()) + if (auto const it{shards_.find(shardIndex)}; it != shards_.end()) { shard = it->second.shard; - if (it->second.retryFinalize) - *referenceHash = it->second.retryFinalize->referenceHash; } else { @@ -1303,7 +1224,7 @@ DatabaseShardImp::finalizeShard( } } - if (!shard->finalize(writeSQLite, referenceHash)) + if (!shard->finalize(writeSQLite, expectedHash)) { if (isStopping()) return; @@ -1466,7 +1387,7 @@ DatabaseShardImp::storeLedgerInShard( acquireIndex_ = 0; if (it->second.state != ShardInfo::State::finalize) - finalizeShard(it->second, false, lock); + finalizeShard(it->second, false, lock, boost::none); } else { @@ -1480,17 +1401,15 @@ DatabaseShardImp::storeLedgerInShard( } void -DatabaseShardImp::removeFailedShard( - std::shared_ptr shard) +DatabaseShardImp::removeFailedShard(std::shared_ptr shard) { { std::lock_guard lock(mutex_); - shards_.erase(shard->index()); if (shard->index() == acquireIndex_) acquireIndex_ = 0; - if(shard->isFinal()) + if ((shards_.erase(shard->index()) > 0) && shard->isFinal()) updateStatus(lock); } diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index a7b12b1da..85aec3642 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -23,7 +23,6 @@ #include #include #include -#include #include @@ -192,9 +191,6 @@ private: std::shared_ptr shard; State state{State::none}; - - // Used during the validation of imported shards - std::unique_ptr retryFinalize; }; Application& app_; @@ -267,12 +263,6 @@ private: std::uint32_t validLedgerSeq, std::lock_guard&); -public: - // Attempts to retrieve a reference last ledger hash - // for a shard and finalize it - void - finalizeWithRefHash(std::uint32_t shardIndex); - private: // Queue a task to finalize a shard by validating its databases // Lock must be held @@ -280,7 +270,8 @@ private: finalizeShard( ShardInfo& shardInfo, bool writeSQLite, - std::lock_guard&); + std::lock_guard&, + boost::optional const& expectedHash); // Set storage and file descriptor usage stats // Lock must NOT be held diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index d4956943a..71f2af65a 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -396,7 +396,7 @@ Shard::isLegacy() const bool Shard::finalize( bool const writeSQLite, - boost::optional const& referenceHash) + boost::optional const& expectedHash) { assert(backend_); @@ -502,7 +502,7 @@ Shard::finalize( // Validate the last ledger hash of a downloaded shard // using a ledger hash obtained from the peer network - if (referenceHash && *referenceHash != hash) + if (expectedHash && *expectedHash != hash) return fail("invalid last ledger hash"); // Validate every ledger stored in the backend diff --git a/src/ripple/rpc/ShardArchiveHandler.h b/src/ripple/rpc/ShardArchiveHandler.h index 6f65610ae..38e62eef8 100644 --- a/src/ripple/rpc/ShardArchiveHandler.h +++ b/src/ripple/rpc/ShardArchiveHandler.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -34,38 +35,33 @@ class ShardArchiveHandler_test; } namespace RPC { -/** Handles the download and import one or more shard archives. */ -class ShardArchiveHandler - : public Stoppable, - public std::enable_shared_from_this +/** Handles the download and import of one or more shard archives. */ +class ShardArchiveHandler : public Stoppable { public: - using pointer = std::shared_ptr; + using TimerOpCounter = + ClosureCounter; friend class test::ShardArchiveHandler_test; static boost::filesystem::path getDownloadDirectory(Config const& config); - static pointer - getInstance(); + static std::unique_ptr + makeShardArchiveHandler(Application& app, Stoppable& parent); - static pointer - getInstance(Application& app, Stoppable& parent); + // Create a ShardArchiveHandler only if + // the state database is present, indicating + // that recovery is needed. + static std::unique_ptr + tryMakeRecoveryHandler(Application& app, Stoppable& parent); - static pointer - recoverInstance(Application& app, Stoppable& parent); + ShardArchiveHandler(Application& app, Stoppable& parent); - static bool - hasInstance(); + virtual ~ShardArchiveHandler() = default; - bool + [[nodiscard]] bool init(); - bool - initFromDB(); - - ~ShardArchiveHandler() = default; - bool add(std::uint32_t shardIndex, std::pair&& url); @@ -84,10 +80,8 @@ private: ShardArchiveHandler& operator=(ShardArchiveHandler const&) = delete; - ShardArchiveHandler( - Application& app, - Stoppable& parent, - bool recovery = false); + [[nodiscard]] bool + initFromDB(std::lock_guard const&); void onStop() override; @@ -105,7 +99,7 @@ private: // Begins the download and import of the next archive. bool - next(std::lock_guard& l); + next(std::lock_guard const& l); // Callback used by the downloader to notify completion of a download. void @@ -117,23 +111,57 @@ private: // Remove the archive being processed. void - remove(std::lock_guard&); + remove(std::lock_guard const&); void doRelease(std::lock_guard const&); - static std::mutex instance_mutex_; - static pointer instance_; + bool + onClosureFailed( + std::string const& errorMsg, + std::lock_guard const& lock); + bool + removeAndProceed(std::lock_guard const& lock); + + ///////////////////////////////////////////////// + // m_ is used to protect access to downloader_, + // archives_, process_ and to protect setting and + // destroying sqliteDB_. + ///////////////////////////////////////////////// std::mutex mutable m_; + std::unique_ptr downloader_; + std::map archives_; + bool process_; + std::unique_ptr sqliteDB_; + ///////////////////////////////////////////////// + Application& app_; beast::Journal const j_; - std::unique_ptr sqliteDB_; - std::shared_ptr downloader_; boost::filesystem::path const downloadDir_; boost::asio::basic_waitable_timer timer_; - bool process_; - std::map archives_; + JobCounter jobCounter_; + TimerOpCounter timerCounter_; + ShardVerificationScheduler verificationScheduler_; +}; + +//////////////////////////////////////////////////////////////////// +// The RecoveryHandler is an empty class that is constructed by +// the application when the ShardArchiveHandler's state database +// is present at application start, indicating that the handler +// needs to perform recovery. However, if recovery isn't needed +// at application start, and the user subsequently submits a request +// to download shards, we construct a ShardArchiveHandler rather +// than a RecoveryHandler to process the request. With this approach, +// type verification can be employed to determine whether the +// ShardArchiveHandler was constructed in recovery mode by the +// application, or as a response to a user submitting a request to +// download shards. +//////////////////////////////////////////////////////////////////// +class RecoveryHandler : public ShardArchiveHandler +{ +public: + RecoveryHandler(Application& app, Stoppable& parent); }; } // namespace RPC diff --git a/src/ripple/rpc/ShardVerificationScheduler.h b/src/ripple/rpc/ShardVerificationScheduler.h new file mode 100644 index 000000000..659b3e904 --- /dev/null +++ b/src/ripple/rpc/ShardVerificationScheduler.h @@ -0,0 +1,84 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_RPC_SHARDVERIFICATIONSCHEDULER_H_INCLUDED +#define RIPPLE_RPC_SHARDVERIFICATIONSCHEDULER_H_INCLUDED + +#include +#include + +namespace ripple { +namespace RPC { + +class ShardVerificationScheduler +{ +public: + // This is the signature of the function that the client + // wants to have invoked upon timer expiration. The function + // should check the error code 'ec' and abort the function + // if the timer was cancelled: + // (ec == boost::asio::error::operation_aborted). + // In the body of the function, the client should perform + // the necessary verification. + using retryFunction = + std::function; + + ShardVerificationScheduler() = default; + + ShardVerificationScheduler( + std::chrono::seconds retryInterval, + std::uint32_t maxAttempts); + + bool + retry(Application& app, bool shouldHaveHash, retryFunction f); + + void + reset(); + +private: + using waitable_timer = + boost::asio::basic_waitable_timer; + + ///////////////////////////////////////////////////// + // NOTE: retryInterval_ and maxAttempts_ were chosen + // semi-arbitrarily and experimenting with other + // values might prove useful. + ///////////////////////////////////////////////////// + + static constexpr std::chrono::seconds defaultRetryInterval_{60}; + + static constexpr std::uint32_t defaultmaxAttempts_{5}; + + // The number of seconds to wait before retrying + // retrieval of a shard's last ledger hash + const std::chrono::seconds retryInterval_{defaultRetryInterval_}; + + // Maximum attempts to retrieve a shard's last ledger hash + const std::uint32_t maxAttempts_{defaultmaxAttempts_}; + + std::unique_ptr timer_; + + // Number of attempts to retrieve a shard's last ledger hash + std::uint32_t numAttempts_{0}; +}; + +} // namespace RPC +} // namespace ripple + +#endif // RIPPLE_RPC_SHARDVERIFICATIONSCHEDULER_H_INCLUDED diff --git a/src/ripple/rpc/handlers/DownloadShard.cpp b/src/ripple/rpc/handlers/DownloadShard.cpp index 5502c5631..0e627d585 100644 --- a/src/ripple/rpc/handlers/DownloadShard.cpp +++ b/src/ripple/rpc/handlers/DownloadShard.cpp @@ -60,7 +60,7 @@ doDownloadShard(RPC::JsonContext& context) auto preShards{shardStore->getPreShards()}; if (!preShards.empty()) { - std::string s{"Download in progress. Shard"}; + std::string s{"Download already in progress. Shard"}; if (!std::all_of(preShards.begin(), preShards.end(), ::isdigit)) s += "s"; return RPC::makeObjectValue(s + " " + preShards); @@ -129,22 +129,15 @@ doDownloadShard(RPC::JsonContext& context) } } - RPC::ShardArchiveHandler::pointer handler; + RPC::ShardArchiveHandler* handler = nullptr; try { - handler = RPC::ShardArchiveHandler::hasInstance() - ? RPC::ShardArchiveHandler::getInstance() - : RPC::ShardArchiveHandler::getInstance( - context.app, context.app.getJobQueue()); + handler = context.app.getShardArchiveHandler(); if (!handler) return RPC::make_error( rpcINTERNAL, "Failed to create ShardArchiveHandler."); - - if (!handler->init()) - return RPC::make_error( - rpcINTERNAL, "Failed to initiate ShardArchiveHandler."); } catch (std::exception const& e) { diff --git a/src/ripple/rpc/impl/ShardArchiveHandler.cpp b/src/ripple/rpc/impl/ShardArchiveHandler.cpp index 25ae1a91e..54a0dd635 100644 --- a/src/ripple/rpc/impl/ShardArchiveHandler.cpp +++ b/src/ripple/rpc/impl/ShardArchiveHandler.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include @@ -34,9 +33,6 @@ namespace RPC { using namespace boost::filesystem; using namespace std::chrono_literals; -std::mutex ShardArchiveHandler::instance_mutex_; -ShardArchiveHandler::pointer ShardArchiveHandler::instance_ = nullptr; - boost::filesystem::path ShardArchiveHandler::getDownloadDirectory(Config const& config) { @@ -48,89 +44,97 @@ ShardArchiveHandler::getDownloadDirectory(Config const& config) "download"; } -auto -ShardArchiveHandler::getInstance() -> pointer -{ - std::lock_guard lock(instance_mutex_); - - return instance_; -} - -auto -ShardArchiveHandler::getInstance(Application& app, Stoppable& parent) -> pointer -{ - std::lock_guard lock(instance_mutex_); - assert(!instance_); - - instance_.reset(new ShardArchiveHandler(app, parent)); - - return instance_; -} - -auto -ShardArchiveHandler::recoverInstance(Application& app, Stoppable& parent) - -> pointer -{ - std::lock_guard lock(instance_mutex_); - assert(!instance_); - - instance_.reset(new ShardArchiveHandler(app, parent, true)); - - return instance_; -} - -bool -ShardArchiveHandler::hasInstance() -{ - std::lock_guard lock(instance_mutex_); - - return instance_.get() != nullptr; -} - -ShardArchiveHandler::ShardArchiveHandler( +std::unique_ptr +ShardArchiveHandler::makeShardArchiveHandler( Application& app, - Stoppable& parent, - bool recovery) + Stoppable& parent) +{ + return std::make_unique(app, parent); +} + +std::unique_ptr +ShardArchiveHandler::tryMakeRecoveryHandler(Application& app, Stoppable& parent) +{ + auto const downloadDir(getDownloadDirectory(app.config())); + + // Create the handler iff the database + // is present. + if (exists(downloadDir / stateDBName) && + is_regular_file(downloadDir / stateDBName)) + { + return std::make_unique(app, parent); + } + + return nullptr; +} + +ShardArchiveHandler::ShardArchiveHandler(Application& app, Stoppable& parent) : Stoppable("ShardArchiveHandler", parent) + , process_(false) , app_(app) , j_(app.journal("ShardArchiveHandler")) , downloadDir_(getDownloadDirectory(app.config())) , timer_(app_.getIOService()) - , process_(false) + , verificationScheduler_( + std::chrono::seconds(get( + app.config().section(ConfigSection::shardDatabase()), + "shard_verification_retry_interval")), + + get( + app.config().section(ConfigSection::shardDatabase()), + "shard_verification_max_attempts")) { assert(app_.getShardStore()); - - if (recovery) - downloader_.reset( - new DatabaseDownloader(app_.getIOService(), j_, app_.config())); } bool ShardArchiveHandler::init() { - try - { - create_directories(downloadDir_); + std::lock_guard lock(m_); - sqliteDB_ = std::make_unique( - downloadDir_, - stateDBName, - DownloaderDBPragma, - ShardArchiveHandlerDBInit); - } - catch (std::exception const& e) + if (process_ || downloader_ != nullptr || sqliteDB_ != nullptr) { - JLOG(j_.error()) << "exception: " << e.what() - << " in function: " << __func__; - + JLOG(j_.warn()) << "Archives already being processed"; return false; } + // Initialize from pre-existing database + if (exists(downloadDir_ / stateDBName) && + is_regular_file(downloadDir_ / stateDBName)) + { + downloader_.reset( + new DatabaseDownloader(app_.getIOService(), j_, app_.config())); + + return initFromDB(lock); + } + + // Fresh initialization + else + { + try + { + create_directories(downloadDir_); + + sqliteDB_ = std::make_unique( + downloadDir_, + stateDBName, + DownloaderDBPragma, + ShardArchiveHandlerDBInit); + } + catch (std::exception const& e) + { + JLOG(j_.error()) + << "exception: " << e.what() << " in function: " << __func__; + + return false; + } + } + return true; } bool -ShardArchiveHandler::initFromDB() +ShardArchiveHandler::initFromDB(std::lock_guard const& lock) { try { @@ -151,8 +155,6 @@ ShardArchiveHandler::initFromDB() soci::rowset rs = (session.prepare << "SELECT * FROM State;"); - std::lock_guard lock(m_); - for (auto it = rs.begin(); it != rs.end(); ++it) { parsedURL url; @@ -190,14 +192,24 @@ ShardArchiveHandler::initFromDB() void ShardArchiveHandler::onStop() { - std::lock_guard lock(m_); - - if (downloader_) { - downloader_->onStop(); - downloader_.reset(); + std::lock_guard lock(m_); + + if (downloader_) + { + downloader_->onStop(); + downloader_.reset(); + } + + timer_.cancel(); } + jobCounter_.join( + "ShardArchiveHandler", std::chrono::milliseconds(2000), j_); + + timerCounter_.join( + "ShardArchiveHandler", std::chrono::milliseconds(2000), j_); + stopped(); } @@ -271,7 +283,7 @@ ShardArchiveHandler::start() if (!downloader_) { // will throw if can't initialize ssl context - downloader_ = std::make_shared( + downloader_ = std::make_unique( app_.getIOService(), j_, app_.config()); } } @@ -281,6 +293,7 @@ ShardArchiveHandler::start() return false; } + process_ = true; return next(lock); } @@ -292,7 +305,7 @@ ShardArchiveHandler::release() } bool -ShardArchiveHandler::next(std::lock_guard& l) +ShardArchiveHandler::next(std::lock_guard const& l) { if (archives_.empty()) { @@ -300,8 +313,51 @@ ShardArchiveHandler::next(std::lock_guard& l) return false; } - // Create a temp archive directory at the root + if (isStopping()) + return false; + auto const shardIndex{archives_.begin()->first}; + + // We use the sequence of the last validated ledger + // to determine whether or not we have stored a ledger + // that comes after the last ledger in this shard. A + // later ledger must be present in order to reliably + // retrieve the hash of the shard's last ledger. + boost::optional expectedHash; + bool shouldHaveHash = false; + if (auto const seq = app_.getShardStore()->lastLedgerSeq(shardIndex); + (shouldHaveHash = app_.getLedgerMaster().getValidLedgerIndex() > seq)) + { + expectedHash = app_.getLedgerMaster().walkHashBySeq(seq); + } + + if (!expectedHash) + { + auto wrapper = + timerCounter_.wrap([this](boost::system::error_code const& ec) { + if (ec != boost::asio::error::operation_aborted) + { + std::lock_guard lock(m_); + this->next(lock); + } + }); + + if (!wrapper) + return onClosureFailed( + "failed to wrap closure for last ledger confirmation timer", l); + + if (!verificationScheduler_.retry(app_, shouldHaveHash, *wrapper)) + { + JLOG(j_.error()) << "failed to find last ledger hash for shard " + << shardIndex << ", maximum attempts reached"; + + return removeAndProceed(l); + } + + return true; + } + + // Create a temp archive directory at the root auto const dstDir{downloadDir_ / std::to_string(shardIndex)}; try { @@ -310,42 +366,42 @@ ShardArchiveHandler::next(std::lock_guard& l) catch (std::exception const& e) { JLOG(j_.error()) << "exception: " << e.what(); - remove(l); - return next(l); + return removeAndProceed(l); } // Download the archive. Process in another thread // to prevent holding up the lock if the downloader // sleeps. auto const& url{archives_.begin()->second}; - app_.getJobQueue().addJob( - jtCLIENT, - "ShardArchiveHandler", - [this, ptr = shared_from_this(), url, dstDir](Job&) { - if (!downloader_->download( - url.domain, - std::to_string(url.port.get_value_or(443)), - url.path, - 11, - dstDir / "archive.tar.lz4", - std::bind( - &ShardArchiveHandler::complete, - ptr, - std::placeholders::_1))) - { - std::lock_guard l(m_); - remove(l); - next(l); - } - }); + auto wrapper = jobCounter_.wrap([this, url, dstDir](Job&) { + if (!downloader_->download( + url.domain, + std::to_string(url.port.get_value_or(443)), + url.path, + 11, + dstDir / "archive.tar.lz4", + [this](path dstPath) { complete(dstPath); })) + { + std::lock_guard l(m_); + removeAndProceed(l); + } + }); + + if (!wrapper) + return onClosureFailed( + "failed to wrap closure for starting download", l); + + app_.getJobQueue().addJob(jtCLIENT, "ShardArchiveHandler", *wrapper); - process_ = true; return true; } void ShardArchiveHandler::complete(path dstPath) { + if (isStopping()) + return; + { std::lock_guard lock(m_); try @@ -354,51 +410,68 @@ ShardArchiveHandler::complete(path dstPath) { auto ar{archives_.begin()}; JLOG(j_.error()) - << "Downloading shard id " << ar->first << " form URL " + << "Downloading shard id " << ar->first << " from URL " << ar->second.domain << ar->second.path; - remove(lock); - next(lock); + removeAndProceed(lock); return; } } catch (std::exception const& e) { JLOG(j_.error()) << "exception: " << e.what(); - remove(lock); - next(lock); + removeAndProceed(lock); return; } } - // Process in another thread to not hold up the IO service - app_.getJobQueue().addJob( - jtCLIENT, - "ShardArchiveHandler", - [=, dstPath = std::move(dstPath), ptr = shared_from_this()](Job&) { - // If not synced then defer and retry - auto const mode{ptr->app_.getOPs().getOperatingMode()}; - if (mode != OperatingMode::FULL) - { - std::lock_guard lock(m_); - timer_.expires_from_now(static_cast( - (static_cast(OperatingMode::FULL) - - static_cast(mode)) * - 10)); - timer_.async_wait( - [=, dstPath = std::move(dstPath), ptr = std::move(ptr)]( - boost::system::error_code const& ec) { - if (ec != boost::asio::error::operation_aborted) - ptr->complete(std::move(dstPath)); - }); - } + auto wrapper = jobCounter_.wrap([=, dstPath = std::move(dstPath)](Job&) { + if (isStopping()) + return; + + // If not synced then defer and retry + auto const mode{app_.getOPs().getOperatingMode()}; + if (mode != OperatingMode::FULL) + { + std::lock_guard lock(m_); + timer_.expires_from_now(static_cast( + (static_cast(OperatingMode::FULL) - + static_cast(mode)) * + 10)); + + auto wrapper = + timerCounter_.wrap([=, dstPath = std::move(dstPath)]( + boost::system::error_code const& ec) { + if (ec != boost::asio::error::operation_aborted) + complete(std::move(dstPath)); + }); + + if (!wrapper) + onClosureFailed( + "failed to wrap closure for operating mode timer", lock); else - { - ptr->process(dstPath); - std::lock_guard lock(m_); - remove(lock); - next(lock); - } - }); + timer_.async_wait(*wrapper); + } + else + { + process(dstPath); + std::lock_guard lock(m_); + removeAndProceed(lock); + } + }); + + if (!wrapper) + { + if (isStopping()) + return; + + JLOG(j_.error()) << "failed to wrap closure for process()"; + + std::lock_guard lock(m_); + removeAndProceed(lock); + } + + // Process in another thread to not hold up the IO service + app_.getJobQueue().addJob(jtCLIENT, "ShardArchiveHandler", *wrapper); } void @@ -441,8 +514,10 @@ ShardArchiveHandler::process(path const& dstPath) } void -ShardArchiveHandler::remove(std::lock_guard&) +ShardArchiveHandler::remove(std::lock_guard const&) { + verificationScheduler_.reset(); + auto const shardIndex{archives_.begin()->first}; app_.getShardStore()->removePreShard(shardIndex); archives_.erase(shardIndex); @@ -466,8 +541,6 @@ ShardArchiveHandler::remove(std::lock_guard&) void ShardArchiveHandler::doRelease(std::lock_guard const&) { - process_ = false; - timer_.cancel(); for (auto const& ar : archives_) app_.getShardStore()->removePreShard(ar.first); @@ -493,6 +566,32 @@ ShardArchiveHandler::doRelease(std::lock_guard const&) } downloader_.reset(); + process_ = false; +} + +bool +ShardArchiveHandler::onClosureFailed( + std::string const& errorMsg, + std::lock_guard const& lock) +{ + if (isStopping()) + return false; + + JLOG(j_.error()) << errorMsg; + + return removeAndProceed(lock); +} + +bool +ShardArchiveHandler::removeAndProceed(std::lock_guard const& lock) +{ + remove(lock); + return next(lock); +} + +RecoveryHandler::RecoveryHandler(Application& app, Stoppable& parent) + : ShardArchiveHandler(app, parent) +{ } } // namespace RPC diff --git a/src/ripple/nodestore/impl/RetryFinalize.cpp b/src/ripple/rpc/impl/ShardVerificationScheduler.cpp similarity index 63% rename from src/ripple/nodestore/impl/RetryFinalize.cpp rename to src/ripple/rpc/impl/ShardVerificationScheduler.cpp index 2b87c0526..ad6b6df7b 100644 --- a/src/ripple/nodestore/impl/RetryFinalize.cpp +++ b/src/ripple/rpc/impl/ShardVerificationScheduler.cpp @@ -18,36 +18,51 @@ //============================================================================== #include -#include +#include namespace ripple { -namespace NodeStore { +namespace RPC { + +ShardVerificationScheduler::ShardVerificationScheduler( + std::chrono::seconds retryInterval, + std::uint32_t maxAttempts) + : retryInterval_( + (retryInterval == std::chrono::seconds(0) ? defaultRetryInterval_ + : retryInterval)) + , maxAttempts_(maxAttempts == 0 ? defaultmaxAttempts_ : maxAttempts) +{ +} bool -RetryFinalize::retry( +ShardVerificationScheduler::retry( Application& app, - retryFunction f, - std::uint32_t shardIndex) + bool shouldHaveHash, + retryFunction f) { if (numAttempts_ >= maxAttempts_) return false; - // Retry attempts only count when we have a validated ledger - if (app.getLedgerMaster().getValidatedLedger()) + // Retry attempts only count when we + // have a validated ledger with a + // sequence later than the shard's + // last ledger. + if (shouldHaveHash) ++numAttempts_; if (!timer_) timer_ = std::make_unique(app.getIOService()); timer_->expires_from_now(retryInterval_); - timer_->async_wait([f{std::move(f)}, shardIndex_ = shardIndex]( - boost::system::error_code const& ec) { - if (ec != boost::asio::error::operation_aborted) - f(shardIndex_); - }); + timer_->async_wait(f); return true; } -} // namespace NodeStore +void +ShardVerificationScheduler::reset() +{ + numAttempts_ = 0; +} + +} // namespace RPC } // namespace ripple diff --git a/src/test/net/SSLHTTPDownloader_test.cpp b/src/test/net/DatabaseDownloader_test.cpp similarity index 98% rename from src/test/net/SSLHTTPDownloader_test.cpp rename to src/test/net/DatabaseDownloader_test.cpp index 233b59811..b81485a22 100644 --- a/src/test/net/SSLHTTPDownloader_test.cpp +++ b/src/test/net/DatabaseDownloader_test.cpp @@ -29,7 +29,7 @@ namespace ripple { namespace test { -class SSLHTTPDownloader_test : public beast::unit_test::suite +class DatabaseDownloader_test : public beast::unit_test::suite { TrustedPublisherServer createServer(jtx::Env& env, bool ssl = true) @@ -80,7 +80,7 @@ class SSLHTTPDownloader_test : public beast::unit_test::suite { test::StreamSink sink_; beast::Journal journal_; - // The SSLHTTPDownloader must be created as shared_ptr + // The DatabaseDownloader must be created as shared_ptr // because it uses shared_from_this std::shared_ptr ptr_; @@ -265,6 +265,6 @@ public: } }; -BEAST_DEFINE_TESTSUITE(SSLHTTPDownloader, net, ripple); +BEAST_DEFINE_TESTSUITE(DatabaseDownloader, net, ripple); } // namespace test } // namespace ripple diff --git a/src/test/rpc/ShardArchiveHandler_test.cpp b/src/test/rpc/ShardArchiveHandler_test.cpp index 4d8ad251b..497acf3d1 100644 --- a/src/test/rpc/ShardArchiveHandler_test.cpp +++ b/src/test/rpc/ShardArchiveHandler_test.cpp @@ -34,7 +34,7 @@ namespace test { class ShardArchiveHandler_test : public beast::unit_test::suite { - using Downloads = std::vector>; + using Downloads = std::vector>; TrustedPublisherServer createServer(jtx::Env& env, bool ssl = true) @@ -49,196 +49,210 @@ class ShardArchiveHandler_test : public beast::unit_test::suite } public: + // Test the shard downloading module by initiating + // and completing a download and verifying the + // contents of the state database. void - testStateDatabase1() + testSingleDownloadAndStateDB() { - testcase("testStateDatabase1"); + testcase("testSingleDownloadAndStateDB"); + + beast::temp_dir tempDir; + + auto c = jtx::envconfig(); + auto& section = c->section(ConfigSection::shardDatabase()); + section.set("path", tempDir.path()); + section.set("max_size_gb", "100"); + c->setupControl(true, true, true); + + jtx::Env env(*this, std::move(c)); + auto handler = env.app().getShardArchiveHandler(); + BEAST_EXPECT(handler); + BEAST_EXPECT(dynamic_cast(handler) == nullptr); + + std::string const rawUrl = "https://foo:443/1.tar.lz4"; + parsedURL url; + + parseUrl(url, rawUrl); + handler->add(1, {url, rawUrl}); { - beast::temp_dir tempDir; + std::lock_guard lock(handler->m_); - auto c = jtx::envconfig(); - auto& section = c->section(ConfigSection::shardDatabase()); - section.set("path", tempDir.path()); - section.set("max_size_gb", "100"); - c->setupControl(true, true, true); + auto& session{handler->sqliteDB_->getSession()}; - jtx::Env env(*this, std::move(c)); - auto handler = RPC::ShardArchiveHandler::getInstance( - env.app(), env.app().getJobQueue()); - BEAST_EXPECT(handler); + soci::rowset rs = + (session.prepare << "SELECT * FROM State;"); - BEAST_EXPECT(handler->init()); + std::uint64_t rowCount = 0; - std::string const rawUrl = "https://foo:443/1.tar.lz4"; + for (auto it = rs.begin(); it != rs.end(); ++it, ++rowCount) + { + BEAST_EXPECT(it->get(0) == 1); + BEAST_EXPECT(it->get(1) == rawUrl); + } + + BEAST_EXPECT(rowCount == 1); + } + + handler->release(); + } + + // Test the shard downloading module by initiating + // and completing three downloads and verifying + // the contents of the state database. + void + testDownloadsAndStateDB() + { + testcase("testDownloadsAndStateDB"); + + beast::temp_dir tempDir; + + auto c = jtx::envconfig(); + auto& section = c->section(ConfigSection::shardDatabase()); + section.set("path", tempDir.path()); + section.set("max_size_gb", "100"); + c->setupControl(true, true, true); + + jtx::Env env(*this, std::move(c)); + auto handler = env.app().getShardArchiveHandler(); + BEAST_EXPECT(handler); + BEAST_EXPECT(dynamic_cast(handler) == nullptr); + + Downloads const dl = { + {1, "https://foo:443/1.tar.lz4"}, + {2, "https://foo:443/2.tar.lz4"}, + {3, "https://foo:443/3.tar.lz4"}}; + + for (auto const& entry : dl) + { parsedURL url; - - parseUrl(url, rawUrl); - handler->add(1, {url, rawUrl}); - - { - std::lock_guard lock(handler->m_); - - auto& session{handler->sqliteDB_->getSession()}; - - soci::rowset rs = - (session.prepare << "SELECT * FROM State;"); - - uint64_t rowCount = 0; - - for (auto it = rs.begin(); it != rs.end(); ++it, ++rowCount) - { - BEAST_EXPECT(it->get(0) == 1); - BEAST_EXPECT(it->get(1) == rawUrl); - } - - BEAST_EXPECT(rowCount == 1); - } - - handler->release(); + parseUrl(url, entry.second); + handler->add(entry.first, {url, entry.second}); } - // Destroy the singleton so we start fresh in - // the next testcase. - RPC::ShardArchiveHandler::instance_.reset(); - } - - void - testStateDatabase2() - { - testcase("testStateDatabase2"); - { - beast::temp_dir tempDir; + std::lock_guard lock(handler->m_); - auto c = jtx::envconfig(); - auto& section = c->section(ConfigSection::shardDatabase()); - section.set("path", tempDir.path()); - section.set("max_size_gb", "100"); - c->setupControl(true, true, true); + auto& session{handler->sqliteDB_->getSession()}; + soci::rowset rs = + (session.prepare << "SELECT * FROM State;"); - jtx::Env env(*this, std::move(c)); - auto handler = RPC::ShardArchiveHandler::getInstance( - env.app(), env.app().getJobQueue()); - BEAST_EXPECT(handler); - - BEAST_EXPECT(handler->init()); - - Downloads const dl = { - {1, "https://foo:443/1.tar.lz4"}, - {2, "https://foo:443/2.tar.lz4"}, - {3, "https://foo:443/3.tar.lz4"}}; - - for (auto const& entry : dl) + std::uint64_t pos = 0; + for (auto it = rs.begin(); it != rs.end(); ++it, ++pos) { - parsedURL url; - parseUrl(url, entry.second); - handler->add(entry.first, {url, entry.second}); + BEAST_EXPECT(it->get(0) == dl[pos].first); + BEAST_EXPECT(it->get(1) == dl[pos].second); } - { - std::lock_guard lock(handler->m_); - - auto& session{handler->sqliteDB_->getSession()}; - soci::rowset rs = - (session.prepare << "SELECT * FROM State;"); - - uint64_t pos = 0; - for (auto it = rs.begin(); it != rs.end(); ++it, ++pos) - { - BEAST_EXPECT(it->get(0) == dl[pos].first); - BEAST_EXPECT(it->get(1) == dl[pos].second); - } - - BEAST_EXPECT(pos == dl.size()); - } - - handler->release(); + BEAST_EXPECT(pos == dl.size()); } - // Destroy the singleton so we start fresh in - // the next testcase. - RPC::ShardArchiveHandler::instance_.reset(); + handler->release(); } + // Test the shard downloading module by initiating + // and completing ten downloads and verifying the + // contents of the filesystem and the handler's + // archives. void - testStateDatabase3() + testDownloadsAndFileSystem() { - testcase("testStateDatabase3"); + testcase("testDownloadsAndFileSystem"); + beast::temp_dir tempDir; + + auto c = jtx::envconfig(); + auto& section = c->section(ConfigSection::shardDatabase()); + section.set("path", tempDir.path()); + section.set("max_size_gb", "100"); + section.set("ledgers_per_shard", "256"); + c->setupControl(true, true, true); + + jtx::Env env(*this, std::move(c)); + + std::uint8_t const numberOfDownloads = 10; + + // Create some ledgers so that the ShardArchiveHandler + // can verify the last ledger hash for the shard + // downloads. + for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() * + (numberOfDownloads + 1); + ++i) { - beast::temp_dir tempDir; - - auto c = jtx::envconfig(); - auto& section = c->section(ConfigSection::shardDatabase()); - section.set("path", tempDir.path()); - section.set("max_size_gb", "100"); - c->setupControl(true, true, true); - - jtx::Env env(*this, std::move(c)); - auto handler = RPC::ShardArchiveHandler::getInstance( - env.app(), env.app().getJobQueue()); - BEAST_EXPECT(handler); - - BEAST_EXPECT(handler->init()); - - auto server = createServer(env); - auto host = server.local_endpoint().address().to_string(); - auto port = std::to_string(server.local_endpoint().port()); - server.stop(); - - Downloads const dl = [&host, &port] { - Downloads ret; - - for (int i = 1; i <= 10; ++i) - { - ret.push_back( - {i, - (boost::format("https://%s:%d/%d.tar.lz4") % host % - port % i) - .str()}); - } - - return ret; - }(); - - for (auto const& entry : dl) - { - parsedURL url; - parseUrl(url, entry.second); - handler->add(entry.first, {url, entry.second}); - } - - BEAST_EXPECT(handler->start()); - - auto stateDir = RPC::ShardArchiveHandler::getDownloadDirectory( - env.app().config()); - - std::unique_lock lock(handler->m_); - - BEAST_EXPECT( - boost::filesystem::exists(stateDir) || - handler->archives_.empty()); - - while (!handler->archives_.empty()) - { - lock.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - lock.lock(); - } - - BEAST_EXPECT(!boost::filesystem::exists(stateDir)); + env.close(); } - // Destroy the singleton so we start fresh in - // the next testcase. - RPC::ShardArchiveHandler::instance_.reset(); + auto handler = env.app().getShardArchiveHandler(); + BEAST_EXPECT(handler); + BEAST_EXPECT(dynamic_cast(handler) == nullptr); + + auto server = createServer(env); + auto host = server.local_endpoint().address().to_string(); + auto port = std::to_string(server.local_endpoint().port()); + server.stop(); + + Downloads const dl = [count = numberOfDownloads, &host, &port] { + Downloads ret; + + for (int i = 1; i <= count; ++i) + { + ret.push_back( + {i, + (boost::format("https://%s:%d/%d.tar.lz4") % host % port % + i) + .str()}); + } + + return ret; + }(); + + for (auto const& entry : dl) + { + parsedURL url; + parseUrl(url, entry.second); + handler->add(entry.first, {url, entry.second}); + } + + BEAST_EXPECT(handler->start()); + + auto stateDir = + RPC::ShardArchiveHandler::getDownloadDirectory(env.app().config()); + + std::unique_lock lock(handler->m_); + + BEAST_EXPECT( + boost::filesystem::exists(stateDir) || handler->archives_.empty()); + + using namespace std::chrono_literals; + auto waitMax = 60s; + + while (!handler->archives_.empty()) + { + lock.unlock(); + std::this_thread::sleep_for(1s); + + if (waitMax -= 1s; waitMax <= 0s) + { + BEAST_EXPECT(false); + break; + } + + lock.lock(); + } + + BEAST_EXPECT(!boost::filesystem::exists(stateDir)); } + // Test the shard downloading module by initiating + // and completing ten downloads and verifying the + // contents of the filesystem and the handler's + // archives. Then restart the application and ensure + // that the handler is created and started automatically. void - testStateDatabase4() + testDownloadsAndRestart() { - testcase("testStateDatabase4"); + testcase("testDownloadsAndRestart"); beast::temp_dir tempDir; @@ -247,24 +261,37 @@ public: auto& section = c->section(ConfigSection::shardDatabase()); section.set("path", tempDir.path()); section.set("max_size_gb", "100"); + section.set("ledgers_per_shard", "256"); c->setupControl(true, true, true); jtx::Env env(*this, std::move(c)); - auto handler = RPC::ShardArchiveHandler::getInstance( - env.app(), env.app().getJobQueue()); - BEAST_EXPECT(handler); - BEAST_EXPECT(handler->init()); + std::uint8_t const numberOfDownloads = 10; + + // Create some ledgers so that the ShardArchiveHandler + // can verify the last ledger hash for the shard + // downloads. + for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() * + (numberOfDownloads + 1); + ++i) + { + env.close(); + } + + auto handler = env.app().getShardArchiveHandler(); + BEAST_EXPECT(handler); + BEAST_EXPECT( + dynamic_cast(handler) == nullptr); auto server = createServer(env); auto host = server.local_endpoint().address().to_string(); auto port = std::to_string(server.local_endpoint().port()); server.stop(); - Downloads const dl = [&host, &port] { + Downloads const dl = [count = numberOfDownloads, &host, &port] { Downloads ret; - for (int i = 1; i <= 10; ++i) + for (int i = 1; i <= count; ++i) { ret.push_back( {i, @@ -298,10 +325,20 @@ public: boost::filesystem::exists(stateDir) || handler->archives_.empty()); + using namespace std::chrono_literals; + auto waitMax = 60s; + while (!handler->archives_.empty()) { lock.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(1s); + + if (waitMax -= 1s; waitMax <= 0s) + { + BEAST_EXPECT(false); + break; + } + lock.lock(); } @@ -314,24 +351,31 @@ public: stateDir / stateDBName); } - // Destroy the singleton so we start fresh in - // the new scope. - RPC::ShardArchiveHandler::instance_.reset(); - auto c = jtx::envconfig(); auto& section = c->section(ConfigSection::shardDatabase()); section.set("path", tempDir.path()); section.set("max_size_gb", "100"); + section.set("ledgers_per_shard", "256"); + section.set("shard_verification_retry_interval", "1"); + section.set("shard_verification_max_attempts", "10000"); c->setupControl(true, true, true); jtx::Env env(*this, std::move(c)); - while (!RPC::ShardArchiveHandler::hasInstance()) - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::uint8_t const numberOfDownloads = 10; - BEAST_EXPECT(RPC::ShardArchiveHandler::hasInstance()); + // Create some ledgers so that the ShardArchiveHandler + // can verify the last ledger hash for the shard + // downloads. + for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() * + (numberOfDownloads + 1); + ++i) + { + env.close(); + } - auto handler = RPC::ShardArchiveHandler::getInstance(); + auto handler = env.app().getShardArchiveHandler(); + BEAST_EXPECT(dynamic_cast(handler) != nullptr); auto stateDir = RPC::ShardArchiveHandler::getDownloadDirectory(env.app().config()); @@ -341,10 +385,20 @@ public: BEAST_EXPECT( boost::filesystem::exists(stateDir) || handler->archives_.empty()); + using namespace std::chrono_literals; + auto waitMax = 60s; + while (!handler->archives_.empty()) { lock.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(1s); + + if (waitMax -= 1s; waitMax <= 0s) + { + BEAST_EXPECT(false); + break; + } + lock.lock(); } @@ -354,10 +408,10 @@ public: void run() override { - testStateDatabase1(); - testStateDatabase2(); - testStateDatabase3(); - testStateDatabase4(); + testSingleDownloadAndStateDB(); + testDownloadsAndStateDB(); + testDownloadsAndFileSystem(); + testDownloadsAndRestart(); } };