diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index d55ce5580..ad9925b10 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -487,6 +487,7 @@ target_sources (rippled PRIVATE main sources: subdir: net #]===============================] + src/ripple/net/impl/DatabaseDownloader.cpp src/ripple/net/impl/HTTPClient.cpp src/ripple/net/impl/InfoSub.cpp src/ripple/net/impl/RPCCall.cpp @@ -918,6 +919,7 @@ target_sources (rippled PRIVATE src/test/rpc/RPCOverload_test.cpp src/test/rpc/RobustTransaction_test.cpp src/test/rpc/ServerInfo_test.cpp + src/test/rpc/ShardArchiveHandler_test.cpp src/test/rpc/Status_test.cpp src/test/rpc/Submit_test.cpp src/test/rpc/Subscribe_test.cpp diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 93042ce5e..d8f223d2e 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -64,6 +64,7 @@ #include #include #include +#include #include #include @@ -1610,6 +1611,53 @@ bool ApplicationImp::setup() } } + if (shardStore_) + { + using namespace boost::filesystem; + + auto stateDb( + RPC::ShardArchiveHandler::getDownloadDirectory(*config_) + / stateDBName); + + try + { + if (exists(stateDb) && + is_regular_file(stateDb) && + !RPC::ShardArchiveHandler::hasInstance()) + { + auto handler = RPC::ShardArchiveHandler::recoverInstance( + *this, + *m_jobQueue); + + assert(handler); + + if (!handler->initFromDB()) + { + JLOG(m_journal.fatal()) + << "Failed to initialize ShardArchiveHandler."; + + return false; + } + + if (!handler->start()) + { + JLOG(m_journal.fatal()) + << "Failed to start ShardArchiveHandler."; + + return false; + } + } + } + catch(std::exception const& e) + { + JLOG(m_journal.fatal()) + << "Exception when starting ShardArchiveHandler from " + "state database: " << e.what(); + + return false; + } + } + return true; } diff --git a/src/ripple/app/main/DBInit.h b/src/ripple/app/main/DBInit.h index af693f708..69d9ccf45 100644 --- a/src/ripple/app/main/DBInit.h +++ b/src/ripple/app/main/DBInit.h @@ -181,6 +181,45 @@ std::array WalletDBInit {{ "END TRANSACTION;" }}; +//////////////////////////////////////////////////////////////////////////////// + +static constexpr auto stateDBName {"state.db"}; + +static constexpr +std::array DownloaderDBPragma +{{ + "PRAGMA synchronous=FULL;", + "PRAGMA journal_mode=DELETE;" +}}; + +static constexpr +std::array ShardArchiveHandlerDBInit +{{ + "BEGIN TRANSACTION;", + + "CREATE TABLE IF NOT EXISTS State ( \ + ShardIndex INTEGER PRIMARY KEY, \ + URL TEXT \ + );", + + "END TRANSACTION;" +}}; + +static constexpr +std::array DatabaseBodyDBInit +{{ + "BEGIN TRANSACTION;", + + "CREATE TABLE IF NOT EXISTS download ( \ + Path TEXT, \ + Data BLOB, \ + Size BIGINT UNSIGNED, \ + Part BIGINT UNSIGNED PRIMARY KEY \ + );", + + "END TRANSACTION;" +}}; + } // ripple #endif diff --git a/src/ripple/core/DatabaseCon.h b/src/ripple/core/DatabaseCon.h index b4dbf6b6d..0090df52b 100644 --- a/src/ripple/core/DatabaseCon.h +++ b/src/ripple/core/DatabaseCon.h @@ -102,18 +102,17 @@ public: boost::filesystem::path pPath = useTempFiles ? "" : (setup.dataDir / DBName); - open(session_, "sqlite", pPath.string()); + init(pPath, pragma, initSQL); + } - for (auto const& p : pragma) - { - soci::statement st = session_.prepare << p; - st.execute(true); - } - for (auto const& sql : initSQL) - { - soci::statement st = session_.prepare << sql; - st.execute(true); - } + template + DatabaseCon( + boost::filesystem::path const& dataDir, + std::string const& DBName, + std::array const& pragma, + std::array const& initSQL) + { + init((dataDir / DBName), pragma, initSQL); } soci::session& getSession() @@ -129,6 +128,27 @@ public: void setupCheckpointing (JobQueue*, Logs&); private: + + template + void + init(boost::filesystem::path const& pPath, + std::array const& pragma, + std::array const& initSQL) + { + open(session_, "sqlite", pPath.string()); + + for (auto const& p : pragma) + { + soci::statement st = session_.prepare << p; + st.execute(true); + } + for (auto const& sql : initSQL) + { + soci::statement st = session_.prepare << sql; + st.execute(true); + } + } + LockedSociSession::mutex lock_; soci::session session_; diff --git a/src/ripple/net/DatabaseBody.h b/src/ripple/net/DatabaseBody.h new file mode 100644 index 000000000..c616aeb9b --- /dev/null +++ b/src/ripple/net/DatabaseBody.h @@ -0,0 +1,170 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_NET_DATABASEBODY_H +#define RIPPLE_NET_DATABASEBODY_H + +#include +#include +#include +#include +#include + +namespace ripple { + +struct DatabaseBody +{ + // Algorithm for storing buffers when parsing. + class reader; + + // The type of the @ref message::body member. + class value_type; + + /** Returns the size of the body + + @param body The database body to use + */ + static std::uint64_t + size(value_type const& body); +}; + +class DatabaseBody::value_type +{ + // This body container holds a connection to the + // database, and also caches the size when set. + + friend class reader; + friend struct DatabaseBody; + + // The cached file size + std::uint64_t file_size_ = 0; + boost::filesystem::path path_; + std::unique_ptr conn_; + std::string batch_; + std::shared_ptr strand_; + std::mutex m_; + std::condition_variable c_; + uint64_t handler_count_ = 0; + uint64_t part_ = 0; + bool closing_ = false; + +public: + /// Destructor + ~value_type() = default; + + /// Constructor + value_type() = default; + + /// Returns `true` if the file is open + bool + is_open() const + { + return bool{conn_}; + } + + /// Returns the size of the file if open + std::uint64_t + size() const + { + return file_size_; + } + + /// Close the file if open + void + close(); + + /** Open a file at the given path with the specified mode + + @param path The utf-8 encoded path to the file + + @param mode The file mode to use + + @param ec Set to the error, if any occurred + */ + void + open( + boost::filesystem::path path, + Config const& config, + boost::asio::io_service& io_service, + boost::system::error_code& ec); +}; + +/** Algorithm for storing buffers when parsing. + + Objects of this type are created during parsing + to store incoming buffers representing the body. +*/ +class DatabaseBody::reader +{ + value_type& body_; // The body we are writing to + + static const uint32_t FLUSH_SIZE = 50000000; + static const uint8_t MAX_HANDLERS = 3; + static const uint16_t MAX_ROW_SIZE_PAD = 500; + +public: + // Constructor. + // + // This is called after the header is parsed and + // indicates that a non-zero sized body may be present. + // `h` holds the received message headers. + // `b` is an instance of `DatabaseBody`. + // + template + explicit reader( + boost::beast::http::header& h, + value_type& b); + + // Initializer + // + // This is called before the body is parsed and + // gives the reader a chance to do something that might + // need to return an error code. It informs us of + // the payload size (`content_length`) which we can + // optionally use for optimization. + // + void + init(boost::optional const&, boost::system::error_code& ec); + + // This function is called one or more times to store + // buffer sequences corresponding to the incoming body. + // + template + std::size_t + put(ConstBufferSequence const& buffers, boost::system::error_code& ec); + + void + do_put(std::string data); + + // This function is called when writing is complete. + // It is an opportunity to perform any final actions + // which might fail, in order to return an error code. + // Operations that might fail should not be attempted in + // destructors, since an exception thrown from there + // would terminate the program. + // + void + finish(boost::system::error_code& ec); +}; + +} // namespace ripple + +#include + +#endif // RIPPLE_NET_DATABASEBODY_H diff --git a/src/ripple/net/DatabaseDownloader.h b/src/ripple/net/DatabaseDownloader.h new file mode 100644 index 000000000..ec86242c3 --- /dev/null +++ b/src/ripple/net/DatabaseDownloader.h @@ -0,0 +1,60 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_NET_DATABASEDOWNLOADER_H +#define RIPPLE_NET_DATABASEDOWNLOADER_H + +#include +#include + +namespace ripple { + +class DatabaseDownloader : public SSLHTTPDownloader +{ +public: + DatabaseDownloader( + boost::asio::io_service& io_service, + beast::Journal j, + Config const& config); + +private: + static const uint8_t MAX_PATH_LEN = std::numeric_limits::max(); + + std::shared_ptr + getParser( + boost::filesystem::path dstPath, + std::function complete, + boost::system::error_code& ec) override; + + bool + checkPath(boost::filesystem::path const& dstPath) override; + + void + closeBody(std::shared_ptr p) override; + + uint64_t + size(std::shared_ptr p) override; + + Config const& config_; + boost::asio::io_service& io_service_; +}; + +} // namespace ripple + +#endif // RIPPLE_NET_DATABASEDOWNLOADER_H diff --git a/src/ripple/net/SSLHTTPDownloader.h b/src/ripple/net/SSLHTTPDownloader.h index 893043ce4..b70383373 100644 --- a/src/ripple/net/SSLHTTPDownloader.h +++ b/src/ripple/net/SSLHTTPDownloader.h @@ -50,7 +50,8 @@ public: SSLHTTPDownloader( boost::asio::io_service& io_service, beast::Journal j, - Config const& config); + Config const& config, + bool isPaused = false); bool download( @@ -61,13 +62,36 @@ public: boost::filesystem::path const& dstPath, std::function complete); + void + onStop(); + + virtual + ~SSLHTTPDownloader() = default; + +protected: + + using parser = boost::beast::http::basic_parser; + + beast::Journal const j_; + + bool + fail( + boost::filesystem::path dstPath, + std::function const& complete, + boost::system::error_code const& ec, + std::string const& errMsg, + std::shared_ptr parser = nullptr); + private: HTTPClientSSLContext ssl_ctx_; boost::asio::io_service::strand strand_; boost::optional< boost::asio::ssl::stream> stream_; boost::beast::flat_buffer read_buf_; - beast::Journal const j_; + std::atomic isStopped_; + bool sessionActive_; + std::mutex m_; + std::condition_variable c_; void do_session( @@ -79,12 +103,25 @@ private: std::function complete, boost::asio::yield_context yield); - void - fail( + virtual + std::shared_ptr + getParser( boost::filesystem::path dstPath, - std::function const& complete, - boost::system::error_code const& ec, - std::string const& errMsg); + std::function complete, + boost::system::error_code & ec) = 0; + + virtual + bool + checkPath( + boost::filesystem::path const& dstPath) = 0; + + virtual + void + closeBody(std::shared_ptr p) = 0; + + virtual + uint64_t + size(std::shared_ptr p) = 0; }; } // ripple diff --git a/src/ripple/net/ShardDownloader.md b/src/ripple/net/ShardDownloader.md new file mode 100644 index 000000000..9ef643a96 --- /dev/null +++ b/src/ripple/net/ShardDownloader.md @@ -0,0 +1,263 @@ +# Shard Downloader Process + +## Overview + +This document describes mechanics of the `SSLHTTPDownloader`, a class that performs the task of downloading shards from remote web servers via +SSL HTTP. The downloader utilizes a strand (`boost::asio::io_service::strand`) to ensure that downloads are never executed concurrently. Hence, if a download is in progress when another download is initiated, the second download will be queued and invoked only when the first download is completed. + +## New Features + +The downloader has been recently (March 2020) been modified to provide some key features: + +- The ability to stop downloads during a graceful shutdown. +- The ability to resume partial downloads after a crash or shutdown. +- *(Deferred) The ability to download from multiple servers to a single file.* + +## Classes + +Much of the shard downloading process concerns the following classes: + +- `SSLHTTPDownloader` + + This is a generic class designed for serially executing downloads via HTTP SSL. + +- `ShardArchiveHandler` + + This class uses the `SSLHTTPDownloader` to fetch shards from remote web servers. Additionally, the archive handler performs sanity checks on the downloaded files and imports the validated files into the local shard store. + + The `ShardArchiveHandler` exposes a simple public interface: + + ```C++ + /** Add an archive to be downloaded and imported. + @param shardIndex the index of the shard to be imported. + @param url the location of the archive. + @return `true` if successfully added. + @note Returns false if called while downloading. + */ + bool + add(std::uint32_t shardIndex, std::pair&& url); + + /** Starts downloading and importing archives. */ + bool + start(); + ``` + + When a client submits a `download_shard` command via the RPC interface, each of the requested files is registered with the handler via the `add` method. After all the files have been registered, the handler's `start` method is invoked, which in turn creates an instance of the `SSLHTTPDownloader` and begins the first download. When the download is completed, the downloader invokes the handler's `complete` method, which will initiate the download of the next file, or simply return if there are no more downloads to process. When `complete` is invoked with no remaining files to be downloaded, the handler and downloader are not destroyed automatically, but persist for the duration of the application. + +Additionally, we leverage a novel class to provide customized parsing for downloaded files: + +- `DatabaseBody` + + This class will define a custom message body type, allowing an `http::response_parser` to write to a SQLite database rather than to a flat file. This class is discussed in further detail in the Recovery section. + +## Execution Concept + +This section describes in greater detail how the key features of the downloader are implemented in C++ using the `boost::asio` framework. + +##### Member Variables: + +The variables shown here are members of the `SSLHTTPDownloader` class and +will be used in the following code examples. + +```c++ +using boost::asio::ssl::stream; +using boost::asio::ip::tcp::socket; + +stream stream_; +std::condition_variable c_; +std::atomic isStopped_; +``` + +### Graceful Shutdowns + +##### Thread 1: + +A graceful shutdown begins when the `onStop()` method of the `ShardArchiveHandler` is invoked: + +```c++ +void +ShardArchiveHandler::onStop() +{ + std::lock_guard lock(m_); + + if (downloader_) + { + downloader_->onStop(); + downloader_.reset(); + } + + stopped(); +} +``` + +Inside of `SSLHTTPDownloader::onStop()`, if a download is currently in progress, the `isStopped_` member variable is set and the thread waits for the download to stop: + +```c++ +void +SSLHTTPDownloader::onStop() +{ + std::unique_lock lock(m_); + + isStopped_ = true; + + if(sessionActive_) + { + // Wait for the handler to exit. + c_.wait(lock, + [this]() + { + return !sessionActive_; + }); + } +} +``` + +##### Thread 2: + +The graceful shutdown is realized when the thread executing the download polls `isStopped_` after this variable has been set to `true`. Polling only occurs while the file is being downloaded, in between calls to `async_read_some()`. The stop takes effect when the socket is closed and the handler function ( `do_session()` ) is exited. + +```c++ +void SSLHTTPDownloader::do_session() +{ + + // (Connection initialization logic) + + . + . + . + + // (In between calls to async_read_some): + if(isStopped_.load()) + { + close(p); + return exit(); + } + + . + . + . + + break; +} +``` + +### Recovery + +Persisting the current state of both the archive handler and the downloader is achieved by leveraging a SQLite database rather than flat files, as the database protects against data corruption that could result from a system crash. + +##### ShardArchiveHandler + +Although `SSLHTTPDownloader` is a generic class that could be used to download a variety of file types, currently it is used exclusively by the `ShardArchiveHandler` to download shards. In order to provide resilience, the `ShardArchiveHandler` will utilize a SQLite database to preserve its current state whenever there are active, paused, or queued downloads. The `shard_db` section in the configuration file allows users to specify the location of the database to use for this purpose. + +###### SQLite Table Format + +| Index | URL | +|:-----:|:-----------------------------------:| +| 1 | https://example.com/1.tar.lz4 | +| 2 | https://example.com/2.tar.lz4 | +| 5 | https://example.com/5.tar.lz4 | + +##### SSLHTTPDownloader + +While the archive handler maintains a list of all partial and queued downloads, the `SSLHTTPDownloader` stores the raw bytes of the file currently being downloaded. The partially downloaded file will be represented as one or more `BLOB` entries in a SQLite database. As the maximum size of a `BLOB` entry is currently limited to roughly 2.1 GB, a 5 GB shard file for instance will occupy three database entries upon completion. + +###### SQLite Table Format + +Since downloads execute serially by design, the entries in this table always correspond to the content of a single file. + +| Bytes | Size | Part | +|:------:|:----------:|:----:| +| 0x... | 2147483647 | 0 | +| 0x... | 2147483647 | 1 | +| 0x... | 705032706 | 2 | + +##### Config File Entry +The `download_path` field of the `shard_db` entry will be used to determine where to store the recovery database. If this field is omitted, the `path` field will be used instead. + +```dosini +# This is the persistent datastore for shards. It is important for the health +# of the ripple network that rippled operators shard as much as practical. +# NuDB requires SSD storage. Helpful information can be found here +# https://ripple.com/build/history-sharding +[shard_db] +type=NuDB +path=/var/lib/rippled/db/shards/nudb +download_path=/var/lib/rippled/db/shards/ +max_size_gb=50 +``` + +##### Resuming Partial Downloads +When resuming downloads after a crash or other interruption, the `SSLHTTPDownloader` will utilize the `range` field of the HTTP header to download only the remainder of the partially downloaded file. + +```C++ +auto downloaded = getPartialFileSize(); +auto total = getTotalFileSize(); + +http::request req {http::verb::head, + target, + version}; + +if (downloaded < total) +{ + // If we already download 1000 bytes to the partial file, + // the range header will look like: + // Range: "bytes=1000-" + req.set(http::field::range, "bytes=" + to_string(downloaded) + "-"); +} +else if(downloaded == total) +{ + // Download is already complete. (Interruption Must + // have occurred after file was downloaded but before + // the state file was updated.) +} +else +{ + // The size of the partially downloaded file exceeds + // the total download size. Error condition. Handle + // appropriately. +} +``` + +##### DatabaseBody + +Previously, the `SSLHTTPDownloader` leveraged an `http::response_parser` instantiated with an `http::file_body`. The `file_body` class declares a nested type, `reader`, which does the task of writing HTTP message payloads (constituting a requested file) to the filesystem. In order for the `http::response_parser` to interface with the database, we implement a custom body type that declares a nested `reader` type which has been outfitted to persist octects received from the remote host to a local SQLite database. The code snippet below illustrates the customization points available to user-defined body types: + +```C++ +/// Defines a Body type +struct body +{ + /// This determines the return type of the `message::body` member function + using value_type = ...; + + /// An optional function, returns the body's payload size (which may be zero) + static + std::uint64_t + size(value_type const& v); + + /// The algorithm used for extracting buffers + class reader; + + /// The algorithm used for inserting buffers + class writer; +} + +``` + +The method invoked to write data to the filesystem (or SQLite database in our case) has the following signature: + +```C++ +std::size_t +body::reader::put(ConstBufferSequence const& buffers, error_code& ec); +``` + +## Sequence Diagram + +This sequence diagram demonstrates a scenario wherein the `ShardArchiveHandler` leverages the state persisted in the database to recover from a crash and resume the scheduled downloads. + +![alt_text](./images/interrupt_sequence.png "Resuming downloads post abort") + +## State Diagram + +This diagram illustrates the various states of the Shard Downloader module. + +![alt_text](./images/states.png "Shard Downloader states") diff --git a/src/ripple/net/images/interrupt_sequence.png b/src/ripple/net/images/interrupt_sequence.png new file mode 100644 index 000000000..87cc3c8d7 Binary files /dev/null and b/src/ripple/net/images/interrupt_sequence.png differ diff --git a/src/ripple/net/images/states.png b/src/ripple/net/images/states.png new file mode 100644 index 000000000..d982955dd Binary files /dev/null and b/src/ripple/net/images/states.png differ diff --git a/src/ripple/net/impl/DatabaseBody.ipp b/src/ripple/net/impl/DatabaseBody.ipp new file mode 100644 index 000000000..d5b854f4b --- /dev/null +++ b/src/ripple/net/impl/DatabaseBody.ipp @@ -0,0 +1,312 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +namespace ripple { + +inline void +DatabaseBody::value_type::close() +{ + { + std::unique_lock lock(m_); + + // Stop all scheduled and currently + // executing handlers before closing. + if (handler_count_) + { + closing_ = true; + + auto predicate = [&] { return !handler_count_; }; + c_.wait(lock, predicate); + } + + conn_.reset(); + } +} + +inline void +DatabaseBody::value_type::open( + boost::filesystem::path path, + Config const& config, + boost::asio::io_service& io_service, + boost::system::error_code& ec) +{ + strand_.reset(new boost::asio::io_service::strand(io_service)); + + auto setup = setup_DatabaseCon(config); + setup.dataDir = path.parent_path(); + + conn_ = std::make_unique( + setup, "Download", DownloaderDBPragma, DatabaseBodyDBInit); + + path_ = path; + + auto db = conn_->checkoutDb(); + + boost::optional pathFromDb; + + *db << "SELECT Path FROM Download WHERE Part=0;", soci::into(pathFromDb); + + // Try to reuse preexisting + // database. + if (pathFromDb) + { + // Can't resuse - database was + // from a different file download. + if (pathFromDb != path.string()) + { + *db << "DROP TABLE Download;"; + } + + // Continuing a file download. + else + { + boost::optional size; + + *db << "SELECT SUM(LENGTH(Data)) FROM Download;", soci::into(size); + + if (size) + file_size_ = size.get(); + } + } +} + +// This is called from message::payload_size +inline std::uint64_t +DatabaseBody::size(value_type const& body) +{ + // Forward the call to the body + return body.size(); +} + +// We don't do much in the reader constructor since the +// database is already open. +// +template +DatabaseBody::reader::reader( + boost::beast::http::header&, + value_type& body) + : body_(body) +{ +} + +// We don't do anything with content_length but a sophisticated +// application might check available space on the device +// to see if there is enough room to store the body. +inline void +DatabaseBody::reader::init( + boost::optional const& /*content_length*/, + boost::system::error_code& ec) +{ + // The connection must already be available for writing + assert(body_.conn_); + + // The error_code specification requires that we + // either set the error to some value, or set it + // to indicate no error. + // + // We don't do anything fancy so set "no error" + ec = {}; +} + +// This will get called one or more times with body buffers +// +template +std::size_t +DatabaseBody::reader::put( + ConstBufferSequence const& buffers, + boost::system::error_code& ec) +{ + // This function must return the total number of + // bytes transferred from the input buffers. + std::size_t nwritten = 0; + + // Loop over all the buffers in the sequence, + // and write each one to the database. + for (auto it = buffer_sequence_begin(buffers); + it != buffer_sequence_end(buffers); + ++it) + { + boost::asio::const_buffer buffer = *it; + + body_.batch_.append( + static_cast(buffer.data()), buffer.size()); + + // Write this buffer to the database + if (body_.batch_.size() > FLUSH_SIZE) + { + bool post = true; + + { + std::lock_guard lock(body_.m_); + + if (body_.handler_count_ >= MAX_HANDLERS) + post = false; + else + ++body_.handler_count_; + } + + if (post) + { + body_.strand_->post( + [data = body_.batch_, this] { this->do_put(data); }); + + body_.batch_.clear(); + } + } + + nwritten += it->size(); + } + + // Indicate success + // This is required by the error_code specification + ec = {}; + + return nwritten; +} + +inline void +DatabaseBody::reader::do_put(std::string data) +{ + using namespace boost::asio; + + { + std::unique_lock lock(body_.m_); + + // The download is being halted. + if (body_.closing_) + { + if (--body_.handler_count_ == 0) + { + lock.unlock(); + body_.c_.notify_one(); + } + + return; + } + } + + auto path = body_.path_.string(); + uint64_t rowSize; + soci::indicator rti; + + uint64_t remainingInRow; + + auto db = body_.conn_->checkoutDb(); + + auto be = dynamic_cast(db->get_backend()); + BOOST_ASSERT(be); + + // This limits how large we can make the blob + // in each row. Also subtract a pad value to + // account for the other values in the row. + auto const blobMaxSize = + sqlite_api::sqlite3_limit(be->conn_, SQLITE_LIMIT_LENGTH, -1) - + MAX_ROW_SIZE_PAD; + + auto rowInit = [&] { + *db << "INSERT INTO Download VALUES (:path, zeroblob(0), 0, :part)", + soci::use(path), soci::use(body_.part_); + + remainingInRow = blobMaxSize; + rowSize = 0; + }; + + *db << "SELECT Path,Size,Part FROM Download ORDER BY Part DESC " + "LIMIT 1", + soci::into(path), soci::into(rowSize), soci::into(body_.part_, rti); + + if (!db->got_data()) + rowInit(); + else + remainingInRow = blobMaxSize - rowSize; + + auto insert = [&db, &rowSize, &part = body_.part_, &fs = body_.file_size_]( + auto const& data) { + uint64_t updatedSize = rowSize + data.size(); + + *db << "UPDATE Download SET Data = CAST(Data || :data AS blob), " + "Size = :size WHERE Part = :part;", + soci::use(data), soci::use(updatedSize), soci::use(part); + + fs += data.size(); + }; + + while (remainingInRow < data.size()) + { + if (remainingInRow) + { + insert(data.substr(0, remainingInRow)); + data.erase(0, remainingInRow); + } + + ++body_.part_; + rowInit(); + } + + insert(data); + + bool const notify = [this] { + std::lock_guard lock(body_.m_); + return --body_.handler_count_ == 0; + }(); + + if (notify) + body_.c_.notify_one(); +} + +// Called after writing is done when there's no error. +inline void +DatabaseBody::reader::finish(boost::system::error_code& ec) +{ + { + std::unique_lock lock(body_.m_); + + // Wait for scheduled DB writes + // to complete. + if (body_.handler_count_) + { + auto predicate = [&] { return !body_.handler_count_; }; + body_.c_.wait(lock, predicate); + } + } + + auto db = body_.conn_->checkoutDb(); + + soci::rowset rs = + (db->prepare << "SELECT Data FROM Download ORDER BY PART ASC;"); + + std::ofstream fout; + fout.open(body_.path_.string(), std::ios::binary | std::ios::out); + + // iteration through the resultset: + for (auto it = rs.begin(); it != rs.end(); ++it) + fout.write(it->data(), it->size()); + + // Flush any pending data that hasn't + // been been written to the DB. + if (body_.batch_.size()) + { + fout.write(body_.batch_.data(), body_.batch_.size()); + body_.batch_.clear(); + } + + fout.close(); +} + +} // namespace ripple diff --git a/src/ripple/net/impl/DatabaseDownloader.cpp b/src/ripple/net/impl/DatabaseDownloader.cpp new file mode 100644 index 000000000..46f82d6a7 --- /dev/null +++ b/src/ripple/net/impl/DatabaseDownloader.cpp @@ -0,0 +1,88 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include + +namespace ripple +{ + +DatabaseDownloader::DatabaseDownloader( + boost::asio::io_service & io_service, + beast::Journal j, + Config const & config) + : SSLHTTPDownloader(io_service, j, config) + , config_(config) + , io_service_(io_service) +{ +} + +auto +DatabaseDownloader::getParser(boost::filesystem::path dstPath, + std::function complete, + boost::system::error_code & ec) -> std::shared_ptr +{ + using namespace boost::beast; + + auto p = std::make_shared>(); + p->body_limit(std::numeric_limits::max()); + p->get().body().open( + dstPath, + config_, + io_service_, + ec); + if(ec) + { + p->get().body().close(); + fail(dstPath, complete, ec, "open"); + } + + return p; +} + +bool +DatabaseDownloader::checkPath(boost::filesystem::path const & dstPath) +{ + return dstPath.string().size() <= MAX_PATH_LEN; +} + +void +DatabaseDownloader::closeBody(std::shared_ptr p) +{ + using namespace boost::beast; + + auto databaseBodyParser = std::dynamic_pointer_cast< + http::response_parser>(p); + assert(databaseBodyParser); + + databaseBodyParser->get().body().close(); +} + +uint64_t +DatabaseDownloader::size(std::shared_ptr p) +{ + using namespace boost::beast; + + auto databaseBodyParser = std::dynamic_pointer_cast< + http::response_parser>(p); + assert(databaseBodyParser); + + return databaseBodyParser->get().body().size(); +} + +} // ripple diff --git a/src/ripple/net/impl/SSLHTTPDownloader.cpp b/src/ripple/net/impl/SSLHTTPDownloader.cpp index bf8f9962b..3bf0622fa 100644 --- a/src/ripple/net/impl/SSLHTTPDownloader.cpp +++ b/src/ripple/net/impl/SSLHTTPDownloader.cpp @@ -25,10 +25,13 @@ namespace ripple { SSLHTTPDownloader::SSLHTTPDownloader( boost::asio::io_service& io_service, beast::Journal j, - Config const& config) - : ssl_ctx_(config, j, boost::asio::ssl::context::tlsv12_client) + Config const& config, + bool isPaused) + : j_(j) + , ssl_ctx_(config, j, boost::asio::ssl::context::tlsv12_client) , strand_(io_service) - , j_(j) + , isStopped_(false) + , sessionActive_(false) { } @@ -41,21 +44,8 @@ SSLHTTPDownloader::download( boost::filesystem::path const& dstPath, std::function complete) { - try - { - if (exists(dstPath)) - { - JLOG(j_.error()) << - "Destination file exists"; - return false; - } - } - catch (std::exception const& e) - { - JLOG(j_.error()) << - "exception: " << e.what(); + if (!checkPath(dstPath)) return false; - } if (!strand_.running_in_this_thread()) strand_.post( @@ -84,6 +74,24 @@ SSLHTTPDownloader::download( return true; } +void +SSLHTTPDownloader::onStop() +{ + std::unique_lock lock(m_); + + isStopped_ = true; + + if(sessionActive_) + { + // Wait for the handler to exit. + c_.wait(lock, + [this]() + { + return !sessionActive_; + }); + } +} + void SSLHTTPDownloader::do_session( std::string const host, @@ -98,126 +106,231 @@ SSLHTTPDownloader::do_session( using namespace boost::beast; boost::system::error_code ec; - ip::tcp::resolver resolver {strand_.context()}; - auto const results = resolver.async_resolve(host, port, yield[ec]); - if (ec) - return fail(dstPath, complete, ec, "async_resolve"); + bool skip = false; - try + ////////////////////////////////////////////// + // Define lambdas for encapsulating download + // operations: + auto connect = [&](std::shared_ptr parser) { - stream_.emplace(strand_.context(), ssl_ctx_.context()); - } - catch (std::exception const& e) - { - return fail(dstPath, complete, ec, - std::string("exception: ") + e.what()); - } + uint64_t const rangeStart = size(parser); - ec = ssl_ctx_.preConnectVerify(*stream_, host); - if (ec) - return fail(dstPath, complete, ec, "preConnectVerify"); + ip::tcp::resolver resolver {strand_.context()}; + auto const results = resolver.async_resolve(host, port, yield[ec]); + if (ec) + return fail(dstPath, complete, ec, "async_resolve", parser); - boost::asio::async_connect( - stream_->next_layer(), results.begin(), results.end(), yield[ec]); - if (ec) - return fail(dstPath, complete, ec, "async_connect"); - - ec = ssl_ctx_.postConnectVerify(*stream_, host); - if (ec) - return fail(dstPath, complete, ec, "postConnectVerify"); - - stream_->async_handshake(ssl::stream_base::client, yield[ec]); - if (ec) - return fail(dstPath, complete, ec, "async_handshake"); - - // Set up an HTTP HEAD request message to find the file size - http::request req {http::verb::head, target, version}; - req.set(http::field::host, host); - req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING); - - http::async_write(*stream_, req, yield[ec]); - if(ec) - return fail(dstPath, complete, ec, "async_write"); - - { - // Check if available storage for file size - http::response_parser p; - p.skip(true); - http::async_read(*stream_, read_buf_, p, yield[ec]); - if(ec) - return fail(dstPath, complete, ec, "async_read"); - if (auto len = p.content_length()) + try { - try + stream_.emplace(strand_.context(), ssl_ctx_.context()); + } + catch (std::exception const& e) + { + return fail(dstPath, complete, ec, + std::string("exception: ") + e.what(), parser); + } + + ec = ssl_ctx_.preConnectVerify(*stream_, host); + if (ec) + return fail(dstPath, complete, ec, "preConnectVerify", parser); + + boost::asio::async_connect( + stream_->next_layer(), results.begin(), results.end(), yield[ec]); + if (ec) + return fail(dstPath, complete, ec, "async_connect", parser); + + ec = ssl_ctx_.postConnectVerify(*stream_, host); + if (ec) + return fail(dstPath, complete, ec, "postConnectVerify", parser); + + stream_->async_handshake(ssl::stream_base::client, yield[ec]); + if (ec) + return fail(dstPath, complete, ec, "async_handshake", parser); + + // Set up an HTTP HEAD request message to find the file size + http::request req {http::verb::head, target, version}; + req.set(http::field::host, host); + req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING); + + // Requesting a portion of the file + if (rangeStart) + { + req.set(http::field::range, + (boost::format("bytes=%llu-") % rangeStart).str()); + } + + http::async_write(*stream_, req, yield[ec]); + if(ec) + return fail(dstPath, complete, ec, "async_write", parser); + + { + // Check if available storage for file size + http::response_parser p; + p.skip(true); + http::async_read(*stream_, read_buf_, p, yield[ec]); + if(ec) + return fail(dstPath, complete, ec, "async_read", parser); + + // Range request was rejected + if(p.get().result() == http::status::range_not_satisfiable) { - if (*len > space(dstPath.parent_path()).available) + req.erase(http::field::range); + + http::async_write(*stream_, req, yield[ec]); + if(ec) + return fail(dstPath, complete, ec, + "async_write_range_verify", parser); + + http::response_parser p; + p.skip(true); + + http::async_read(*stream_, read_buf_, p, yield[ec]); + if(ec) + return fail(dstPath, complete, ec, + "async_read_range_verify", parser); + + // The entire file is downloaded already. + if(p.content_length() == rangeStart) + skip = true; + else + return fail(dstPath, complete, ec, + "range_not_satisfiable", parser); + } + else if (rangeStart && + p.get().result() != http::status::partial_content) + { + ec.assign(boost::system::errc::not_supported, + boost::system::generic_category()); + + return fail(dstPath, complete, ec, + "Range request ignored", parser); + } + else if (auto len = p.content_length()) + { + try + { + if (*len > space(dstPath.parent_path()).available) + { + return fail(dstPath, complete, ec, + "Insufficient disk space for download", parser); + } + } + catch (std::exception const& e) { return fail(dstPath, complete, ec, - "Insufficient disk space for download"); + std::string("exception: ") + e.what(), parser); } } - catch (std::exception const& e) + } + + if(!skip) + { + // Set up an HTTP GET request message to download the file + req.method(http::verb::get); + + if (rangeStart) { - return fail(dstPath, complete, ec, - std::string("exception: ") + e.what()); + req.set(http::field::range, + (boost::format("bytes=%llu-") % rangeStart).str()); } } + + http::async_write(*stream_, req, yield[ec]); + if(ec) + return fail(dstPath, complete, ec, "async_write", parser); + + return true; + }; + + auto close = [&](auto p) + { + closeBody(p); + + // Gracefully close the stream + stream_->async_shutdown(yield[ec]); + if (ec == boost::asio::error::eof) + ec.assign(0, ec.category()); + if (ec) + { + // Most web servers don't bother with performing + // the SSL shutdown handshake, for speed. + JLOG(j_.trace()) << + "async_shutdown: " << ec.message(); + } + // The socket cannot be reused + stream_ = boost::none; + }; + + auto getParser = [&] + { + auto p = this->getParser(dstPath, complete, ec); + if (ec) + fail(dstPath, complete, ec, "getParser", p); + + return p; + }; + + // When the downloader is being stopped + // because the server is shutting down, + // this method notifies a 'Stoppable' + // object that the session has ended. + auto exit = [this]() + { + std::lock_guard lock(m_); + sessionActive_ = false; + c_.notify_one(); + }; + + // end lambdas + //////////////////////////////////////////////////////////// + + { + std::lock_guard lock(m_); + sessionActive_ = true; } - // Set up an HTTP GET request message to download the file - req.method(http::verb::get); - http::async_write(*stream_, req, yield[ec]); - if(ec) - return fail(dstPath, complete, ec, "async_write"); + if(isStopped_.load()) + return exit(); + + auto p = getParser(); + if (ec) + return exit(); + + if (!connect(p) || ec) + return exit(); + + if(skip) + p->skip(true); // Download the file - http::response_parser p; - p.body_limit(std::numeric_limits::max()); - p.get().body().open( - dstPath.string().c_str(), - boost::beast::file_mode::write, - ec); - if (ec) + while (!p->is_done()) { - p.get().body().close(); - return fail(dstPath, complete, ec, "open"); - } + if(isStopped_.load()) + { + close(p); + return exit(); + } - http::async_read(*stream_, read_buf_, p, yield[ec]); - if (ec) - { - p.get().body().close(); - return fail(dstPath, complete, ec, "async_read"); + http::async_read_some(*stream_, read_buf_, *p, yield[ec]); } - p.get().body().close(); - - // Gracefully close the stream - stream_->async_shutdown(yield[ec]); - if (ec == boost::asio::error::eof) - ec.assign(0, ec.category()); - if (ec) - { - // Most web servers don't bother with performing - // the SSL shutdown handshake, for speed. - JLOG(j_.trace()) << - "async_shutdown: " << ec.message(); - } - // The socket cannot be reused - stream_ = boost::none; JLOG(j_.trace()) << "download completed: " << dstPath.string(); + close(p); + exit(); + // Notify the completion handler complete(std::move(dstPath)); } -void +bool SSLHTTPDownloader::fail( boost::filesystem::path dstPath, std::function const& complete, boost::system::error_code const& ec, - std::string const& errMsg) + std::string const& errMsg, + std::shared_ptr parser) { if (!ec) { @@ -230,18 +343,21 @@ SSLHTTPDownloader::fail( errMsg << ": " << ec.message(); } + if (parser) + closeBody(parser); + try { remove(dstPath); } catch (std::exception const& e) { - JLOG(j_.error()) << - "exception: " << e.what(); + JLOG(j_.error()) << "exception: " << e.what() + << " in function: " << __func__; } complete(std::move(dstPath)); + + return false; } - - }// ripple diff --git a/src/ripple/net/uml/interrupt_sequence.pu b/src/ripple/net/uml/interrupt_sequence.pu new file mode 100644 index 000000000..ba046d084 --- /dev/null +++ b/src/ripple/net/uml/interrupt_sequence.pu @@ -0,0 +1,233 @@ +@startuml + + +skinparam shadowing false + +/' +skinparam sequence { + ArrowColor #e1e4e8 + ActorBorderColor #e1e4e8 + DatabaseBorderColor #e1e4e8 + LifeLineBorderColor Black + LifeLineBackgroundColor #d3d6d9 + + ParticipantBorderColor DeepSkyBlue + ParticipantBackgroundColor DodgerBlue + ParticipantFontName Impact + ParticipantFontSize 17 + ParticipantFontColor #A9DCDF + + NoteBackgroundColor #6a737d + + ActorBackgroundColor #f6f8fa + ActorFontColor #6a737d + ActorFontSize 17 + ActorFontName Aapex + + EntityBackgroundColor #f6f8fa + EntityFontColor #6a737d + EntityFontSize 17 + EntityFontName Aapex + + DatabaseBackgroundColor #f6f8fa + DatabaseFontColor #6a737d + DatabaseFontSize 17 + DatabaseFontName Aapex + + CollectionsBackgroundColor #f6f8fa + ActorFontColor #6a737d + ActorFontSize 17 + ActorFontName Aapex +} + +skinparam note { + BackgroundColor #fafbfc + BorderColor #e1e4e8 +} +'/ + +'skinparam monochrome true + +actor Client as c +entity RippleNode as rn +entity ShardArchiveHandler as sa +entity SSLHTTPDownloader as d +database Database as db +collections Fileserver as s + +c -> rn: Launch RippleNode +activate rn + +c -> rn: Issue download request + +note right of c + **Download Request:** + + { + "method": "download_shard", + "params": + [ + { + "shards": + [ + {"index": 1, "url": "https://example.com/1.tar.lz4"}, + {"index": 2, "url": "https://example.com/2.tar.lz4"}, + {"index": 5, "url": "https://example.com/5.tar.lz4"} + ] + } + ] + } +end note + +rn -> sa: Create instance of Handler +activate sa + +rn -> sa: Add three downloads +sa -> sa: Validate requested downloads + +rn -> sa: Initiate Downloads +sa -> rn: ACK: Initiating +rn -> c: Initiating requested downloads + +sa -> db: Save state to the database\n(Processing three downloads) + +note right of db + + **ArchiveHandler State (SQLite Table):** + + | Index | URL | + | 1 | https://example.com/1.tar.lz4 | + | 2 | https://example.com/2.tar.lz4 | + | 5 | https://example.com/5.tar.lz4 | + +end note + +sa -> d: Create instance of Downloader +activate d + +group Download 1 + + note over sa + **Download 1:** + + This encapsulates the download of the first file + at URL "https://example.com/1.tar.lz4". + + end note + + sa -> d: Start download + + d -> s: Connect and request file + s -> d: Send file + d -> sa: Invoke completion handler + +end + +sa -> sa: Import and validate shard + +sa -> db: Update persisted state\n(Remove download) + +note right of db + **ArchiveHandler State:** + + | Index | URL | + | 2 | https://example.com/2.tar.lz4 | + | 5 | https://example.com/5.tar.lz4 | + +end note + +group Download 2 + + sa -> d: Start download + + d -> s: Connect and request file + +end + +rn -> rn: **RippleNode crashes** + +deactivate sa +deactivate rn +deactivate d + +c -> rn: Restart RippleNode +activate rn + +rn -> db: Detect non-empty state database + +rn -> sa: Create instance of Handler +activate sa + +sa -> db: Load state + +note right of db + **ArchiveHandler State:** + + | Index | URL | + | 2 | https://example.com/2.tar.lz4 | + | 5 | https://example.com/5.tar.lz4 | + +end note + +sa -> d: Create instance of Downloader +activate d + +sa -> sa: Resume Download 2 + +group Download 2 + + sa -> d: Start download + + d -> s: Connect and request file + s -> d: Send file + d -> sa: Invoke completion handler + +end + +sa -> sa: Import and validate shard + +sa -> db: Update persisted state \n(Remove download) + +note right of db + **ArchiveHandler State:** + + | Index | URL | + | 5 | https://example.com/5.tar.lz4 | + +end note + +group Download 3 + + sa -> d: Start download + + d -> s: Connect and request file + s -> d: Send file + d -> sa: Invoke completion handler + +end + +sa -> sa: Import and validate shard + +sa -> db: Update persisted state \n(Remove download) + +note right of db + **ArchiveHandler State:** + + ***empty*** + +end note + +sa -> db: Remove empty database + +sa -> sa: Automatically destroyed +deactivate sa + +d -> d: Destroyed via reference\ncounting +deactivate d + +c -> rn: Poll RippleNode to verify successfull\nimport of all requested shards. +c -> rn: Shutdown RippleNode + +deactivate rn + +@enduml diff --git a/src/ripple/net/uml/states.pu b/src/ripple/net/uml/states.pu new file mode 100644 index 000000000..b5db8ee48 --- /dev/null +++ b/src/ripple/net/uml/states.pu @@ -0,0 +1,69 @@ +@startuml + +state "Updating Database" as UD4 { + UD4: Update the database to reflect + UD4: the current state. +} +state "Initiating Download" as ID { + ID: Omit the range header to download + ID: the entire file. +} + +state "Evaluate Database" as ED { + ED: Determine the current state + ED: based on the contents of the + ED: database from a previous run. +} + +state "Remove Database" as RD { + RD: The database is destroyed when + RD: empty. +} + +state "Download in Progress" as DP + +state "Download Completed" as DC { + + state "Updating Database" as UD { + UD: Update the database to reflect + UD: the current state. + } + + state "Queue Check" as QC { + QC: Check the queue for any reamining + QC: downloads. + } + + [*] --> UD + UD --> QC +} + +state "Check Resume" as CR { + CR: Determine whether we're resuming + CR: a previous download or starting a + CR: new one. +} + +state "Resuming Download" as IPD { + IPD: Set the range header in the + IPD: HTTP request as needed. +} + +[*] --> ED : State DB is present at\nnode launch +ED --> RD : State DB is empty +ED --> CR : There are downloads queued +RD --> [*] + +[*] --> UD4 : Client invokes <>\ncommand +UD4 --> ID : Database updated +ID --> DP : Download started +DP --> DC : Download completed +DC --> ID : There **are** additional downloads\nqueued +DP --> [*] : A graceful shutdown is\nin progress +DC --> RD : There **are no** additional\ndownloads queued + +CR --> IPD : Resuming an interrupted\ndownload +IPD --> DP: Download started +CR --> ID : Initiating a new\ndownload + +@enduml diff --git a/src/ripple/rpc/ShardArchiveHandler.h b/src/ripple/rpc/ShardArchiveHandler.h index f1026603a..73ec512ad 100644 --- a/src/ripple/rpc/ShardArchiveHandler.h +++ b/src/ripple/rpc/ShardArchiveHandler.h @@ -23,27 +23,76 @@ #include #include #include -#include +#include #include #include namespace ripple { +namespace test { class ShardArchiveHandler_test; } namespace RPC { /** Handles the download and import one or more shard archives. */ class ShardArchiveHandler - : public std::enable_shared_from_this + : public Stoppable + , public std::enable_shared_from_this { public: + + using pointer = std::shared_ptr; + friend class test::ShardArchiveHandler_test; + + static + boost::filesystem::path + getDownloadDirectory(Config const& config); + + static + pointer + getInstance(); + + static + pointer + getInstance(Application& app, Stoppable& parent); + + static + pointer + recoverInstance(Application& app, Stoppable& parent); + + static + bool + hasInstance(); + + bool + init(); + + bool + initFromDB(); + + ~ShardArchiveHandler() = default; + + bool + add(std::uint32_t shardIndex, std::pair&& url); + + /** Starts downloading and importing archives. */ + bool + start(); + + void + release(); + +private: + ShardArchiveHandler() = delete; ShardArchiveHandler(ShardArchiveHandler const&) = delete; ShardArchiveHandler& operator= (ShardArchiveHandler&&) = delete; ShardArchiveHandler& operator= (ShardArchiveHandler const&) = delete; - ShardArchiveHandler(Application& app); + ShardArchiveHandler( + Application& app, + Stoppable& parent, + bool recovery = false); - ~ShardArchiveHandler(); + void onStop () override; /** Add an archive to be downloaded and imported. @param shardIndex the index of the shard to be imported. @@ -52,13 +101,9 @@ public: @note Returns false if called while downloading. */ bool - add(std::uint32_t shardIndex, parsedURL&& url); + add(std::uint32_t shardIndex, parsedURL&& url, + std::lock_guard const&); - /** Starts downloading and importing archives. */ - bool - start(); - -private: // Begins the download and import of the next archive. bool next(std::lock_guard& l); @@ -75,14 +120,21 @@ private: void remove(std::lock_guard&); + void + doRelease(std::lock_guard const&); + + static std::mutex instance_mutex_; + static pointer instance_; + std::mutex mutable m_; Application& app_; - std::shared_ptr downloader_; + beast::Journal const j_; + std::unique_ptr sqliteDB_; + std::shared_ptr downloader_; boost::filesystem::path const downloadDir_; boost::asio::basic_waitable_timer timer_; bool process_; std::map archives_; - beast::Journal const j_; }; } // RPC diff --git a/src/ripple/rpc/handlers/DownloadShard.cpp b/src/ripple/rpc/handlers/DownloadShard.cpp index 174867ce1..02ff8e6df 100644 --- a/src/ripple/rpc/handlers/DownloadShard.cpp +++ b/src/ripple/rpc/handlers/DownloadShard.cpp @@ -77,7 +77,7 @@ doDownloadShard(RPC::JsonContext& context) // Validate shards static const std::string ext {".tar.lz4"}; - std::map archives; + std::map> archives; for (auto& it : context.params[jss::shards]) { // Validate the index @@ -94,7 +94,8 @@ doDownloadShard(RPC::JsonContext& context) if (!it.isMember(jss::url)) return RPC::missing_field_error(jss::url); parsedURL url; - if (!parseUrl(url, it[jss::url].asString()) || + auto unparsedURL = it[jss::url].asString(); + if (!parseUrl(url, unparsedURL) || url.domain.empty() || url.path.empty()) { return RPC::invalid_field_error(jss::url); @@ -116,16 +117,39 @@ doDownloadShard(RPC::JsonContext& context) } // Check for duplicate indexes - if (!archives.emplace(jv.asUInt(), std::move(url)).second) + if (!archives.emplace(jv.asUInt(), + std::make_pair(std::move(url), unparsedURL)).second) { return RPC::make_param_error("Invalid field '" + std::string(jss::index) + "', duplicate shard ids."); } } - // Begin downloading. The handler keeps itself alive while downloading. - auto handler { - std::make_shared(context.app)}; + RPC::ShardArchiveHandler::pointer handler; + + try + { + handler = RPC::ShardArchiveHandler::hasInstance() ? + RPC::ShardArchiveHandler::getInstance() : + RPC::ShardArchiveHandler::getInstance( + context.app, + context.app.getJobQueue()); + + if(!handler) + return RPC::make_error (rpcINTERNAL, + "Failed to create ShardArchiveHandler."); + + if(!handler->init()) + return RPC::make_error (rpcINTERNAL, + "Failed to initiate ShardArchiveHandler."); + } + catch (std::exception const& e) + { + return RPC::make_error (rpcINTERNAL, + std::string("Failed to start download: ") + + e.what()); + } + for (auto& [index, url] : archives) { if (!handler->add(index, std::move(url))) @@ -135,8 +159,13 @@ doDownloadShard(RPC::JsonContext& context) std::to_string(index) + " exists or being acquired"); } } + + // Begin downloading. if (!handler->start()) + { + handler->release(); return rpcError(rpcINTERNAL); + } std::string s {"Downloading shard"}; preShards = shardStore->getPreShards(); diff --git a/src/ripple/rpc/handlers/Handlers.h b/src/ripple/rpc/handlers/Handlers.h index 805e15640..edf0acbdf 100644 --- a/src/ripple/rpc/handlers/Handlers.h +++ b/src/ripple/rpc/handlers/Handlers.h @@ -61,10 +61,12 @@ Json::Value doManifest (RPC::JsonContext&); Json::Value doNoRippleCheck (RPC::JsonContext&); Json::Value doOwnerInfo (RPC::JsonContext&); Json::Value doPathFind (RPC::JsonContext&); +Json::Value doPause (RPC::JsonContext&); Json::Value doPeers (RPC::JsonContext&); Json::Value doPing (RPC::JsonContext&); Json::Value doPrint (RPC::JsonContext&); Json::Value doRandom (RPC::JsonContext&); +Json::Value doResume (RPC::JsonContext&); Json::Value doPeerReservationsAdd (RPC::JsonContext&); Json::Value doPeerReservationsDel (RPC::JsonContext&); Json::Value doPeerReservationsList (RPC::JsonContext&); diff --git a/src/ripple/rpc/impl/ShardArchiveHandler.cpp b/src/ripple/rpc/impl/ShardArchiveHandler.cpp index 1405b52da..fda926ef2 100644 --- a/src/ripple/rpc/impl/ShardArchiveHandler.cpp +++ b/src/ripple/rpc/impl/ShardArchiveHandler.cpp @@ -22,6 +22,9 @@ #include #include #include +#include +#include +#include #include @@ -31,41 +34,192 @@ namespace RPC { using namespace boost::filesystem; using namespace std::chrono_literals; -ShardArchiveHandler::ShardArchiveHandler(Application& app) - : app_(app) - , downloadDir_(get(app_.config().section( - ConfigSection::shardDatabase()), "path", "") + "/download") - , timer_(app_.getIOService()) - , process_(false) - , j_(app.journal("ShardArchiveHandler")) +std::mutex ShardArchiveHandler::instance_mutex_; +ShardArchiveHandler::pointer ShardArchiveHandler::instance_ = nullptr; + +boost::filesystem::path +ShardArchiveHandler::getDownloadDirectory(Config const& config) { - assert(app_.getShardStore()); + return get(config.section( + ConfigSection::shardDatabase()), "download_path", + get(config.section(ConfigSection::shardDatabase()), + "path", "")) / "download"; } -ShardArchiveHandler::~ShardArchiveHandler() +auto +ShardArchiveHandler::getInstance() -> pointer { - std::lock_guard lock(m_); - timer_.cancel(); - for (auto const& ar : archives_) - app_.getShardStore()->removePreShard(ar.first); - archives_.clear(); + std::lock_guard lock(instance_mutex_); - // Remove temp root download directory - try - { - remove_all(downloadDir_); - } - catch (std::exception const& e) - { - JLOG(j_.error()) << - "exception: " << e.what(); - } + return instance_; +} + +auto +ShardArchiveHandler::getInstance(Application& app, + Stoppable& parent) -> pointer +{ + std::lock_guard lock(instance_mutex_); + assert(!instance_); + + instance_.reset(new ShardArchiveHandler(app, parent)); + + return instance_; +} + +auto +ShardArchiveHandler::recoverInstance(Application& app, Stoppable& parent) -> pointer +{ + std::lock_guard lock(instance_mutex_); + assert(!instance_); + + instance_.reset(new ShardArchiveHandler(app, parent, true)); + + return instance_; } bool -ShardArchiveHandler::add(std::uint32_t shardIndex, parsedURL&& url) +ShardArchiveHandler::hasInstance() +{ + std::lock_guard lock(instance_mutex_); + + return instance_.get() != nullptr; +} + +ShardArchiveHandler::ShardArchiveHandler( + Application& app, + Stoppable& parent, + bool recovery) + : Stoppable("ShardArchiveHandler", parent) + , app_(app) + , j_(app.journal("ShardArchiveHandler")) + , downloadDir_(getDownloadDirectory(app.config())) + , timer_(app_.getIOService()) + , process_(false) +{ + assert(app_.getShardStore()); + + if(recovery) + downloader_.reset(new DatabaseDownloader ( + app_.getIOService(), j_, app_.config())); +} + +bool +ShardArchiveHandler::init() +{ + try + { + create_directories(downloadDir_); + + sqliteDB_ = std::make_unique( + downloadDir_ , + stateDBName, + DownloaderDBPragma, + ShardArchiveHandlerDBInit); + } + catch(std::exception const& e) + { + JLOG(j_.error()) << "exception: " << e.what() + << " in function: " << __func__; + + return false; + } + + return true; +} + +bool +ShardArchiveHandler::initFromDB() +{ + try + { + using namespace boost::filesystem; + + assert(exists(downloadDir_ / stateDBName) && + is_regular_file(downloadDir_ / stateDBName)); + + sqliteDB_ = std::make_unique( + downloadDir_, + stateDBName, + DownloaderDBPragma, + ShardArchiveHandlerDBInit); + + auto& session{sqliteDB_->getSession()}; + + soci::rowset rs = (session.prepare + << "SELECT * FROM State;"); + + std::lock_guard lock(m_); + + for (auto it = rs.begin(); it != rs.end(); ++it) + { + parsedURL url; + + if (!parseUrl(url, it->get(1))) + { + JLOG(j_.error()) << "Failed to parse url: " + << it->get(1); + + continue; + } + + add(it->get(0), std::move(url), lock); + } + + // Failed to load anything + // from the state database. + if(archives_.empty()) + { + release(); + return false; + } + } + catch(std::exception const& e) + { + JLOG(j_.error()) << "exception: " << e.what() + << " in function: " << __func__; + + return false; + } + + return true; +} + +void +ShardArchiveHandler::onStop() +{ + std::lock_guard lock(m_); + + if (downloader_) + { + downloader_->onStop(); + downloader_.reset(); + } + + stopped(); +} + +bool +ShardArchiveHandler::add(std::uint32_t shardIndex, + std::pair&& url) +{ + std::lock_guard lock(m_); + + if (!add(shardIndex, std::forward(url.first), lock)) + return false; + + auto& session{sqliteDB_->getSession()}; + + session << "INSERT INTO State VALUES (:index, :url);", + soci::use(shardIndex), + soci::use(url.second); + + return true; +} + +bool +ShardArchiveHandler::add(std::uint32_t shardIndex, parsedURL&& url, + std::lock_guard const&) { - std::lock_guard lock(m_); if (process_) { JLOG(j_.error()) << @@ -76,9 +230,12 @@ ShardArchiveHandler::add(std::uint32_t shardIndex, parsedURL&& url) auto const it {archives_.find(shardIndex)}; if (it != archives_.end()) return url == it->second; + if (!app_.getShardStore()->prepareShard(shardIndex)) return false; + archives_.emplace(shardIndex, std::move(url)); + return true; } @@ -107,16 +264,13 @@ ShardArchiveHandler::start() try { - // Remove if remnant from a crash - remove_all(downloadDir_); - // Create temp root download directory - create_directory(downloadDir_); + create_directories(downloadDir_); if (!downloader_) { // will throw if can't initialize ssl context - downloader_ = std::make_shared( + downloader_ = std::make_shared( app_.getIOService(), j_, app_.config()); } } @@ -130,12 +284,19 @@ ShardArchiveHandler::start() return next(lock); } +void +ShardArchiveHandler::release() +{ + std::lock_guard lock(m_); + doRelease(lock); +} + bool ShardArchiveHandler::next(std::lock_guard& l) { if (archives_.empty()) { - process_ = false; + doRelease(l); return false; } @@ -154,20 +315,28 @@ ShardArchiveHandler::next(std::lock_guard& l) return next(l); } - // Download the archive + // Download the archive. Process in another thread + // to prevent holding up the lock if the downloader + // sleeps. auto const& url {archives_.begin()->second}; - if (!downloader_->download( - url.domain, - std::to_string(url.port.get_value_or(443)), - url.path, - 11, - dstDir / "archive.tar.lz4", - std::bind(&ShardArchiveHandler::complete, - shared_from_this(), std::placeholders::_1))) - { - remove(l); - return next(l); - } + app_.getJobQueue().addJob( + jtCLIENT, "ShardArchiveHandler", + [this, ptr = shared_from_this(), url, dstDir](Job&) + { + if (!downloader_->download( + url.domain, + std::to_string(url.port.get_value_or(443)), + url.path, + 11, + dstDir / "archive.tar.lz4", + std::bind(&ShardArchiveHandler::complete, + ptr, std::placeholders::_1))) + { + std::lock_guard l(m_); + remove(l); + next(l); + } + }); process_ = true; return true; @@ -206,7 +375,7 @@ ShardArchiveHandler::complete(path dstPath) jtCLIENT, "ShardArchiveHandler", [=, dstPath = std::move(dstPath), ptr = shared_from_this()](Job&) { - // If validating and not synced then defer and retry + // If not synced then defer and retry auto const mode {ptr->app_.getOPs().getOperatingMode()}; if (mode != OperatingMode::FULL) { @@ -282,6 +451,11 @@ ShardArchiveHandler::remove(std::lock_guard&) app_.getShardStore()->removePreShard(shardIndex); archives_.erase(shardIndex); + auto& session{sqliteDB_->getSession()}; + + session << "DELETE FROM State WHERE ShardIndex = :index;", + soci::use(shardIndex); + auto const dstDir {downloadDir_ / std::to_string(shardIndex)}; try { @@ -294,5 +468,37 @@ ShardArchiveHandler::remove(std::lock_guard&) } } +void +ShardArchiveHandler::doRelease(std::lock_guard const&) +{ + process_ = false; + + timer_.cancel(); + for (auto const& ar : archives_) + app_.getShardStore()->removePreShard(ar.first); + archives_.clear(); + + { + auto& session{sqliteDB_->getSession()}; + + session << "DROP TABLE State;"; + } + + sqliteDB_.reset(); + + // Remove temp root download directory + try + { + remove_all(downloadDir_); + } + catch (std::exception const& e) + { + JLOG(j_.error()) << "exception: " << e.what() + << " in function: " << __func__; + } + + downloader_.reset(); +} + } // RPC } // ripple diff --git a/src/test/net/SSLHTTPDownloader_test.cpp b/src/test/net/SSLHTTPDownloader_test.cpp index f8421e7d2..04726137a 100644 --- a/src/test/net/SSLHTTPDownloader_test.cpp +++ b/src/test/net/SSLHTTPDownloader_test.cpp @@ -17,11 +17,10 @@ */ //============================================================================== -#include +#include #include #include #include -#include #include #include #include @@ -82,15 +81,15 @@ class SSLHTTPDownloader_test : public beast::unit_test::suite beast::Journal journal_; // The SSLHTTPDownloader must be created as shared_ptr // because it uses shared_from_this - std::shared_ptr ptr_; + std::shared_ptr ptr_; Downloader(jtx::Env& env) : journal_ {sink_} - , ptr_ {std::make_shared( + , ptr_ {std::make_shared( env.app().getIOService(), journal_, env.app().config())} {} - SSLHTTPDownloader* operator->() + DatabaseDownloader* operator->() { return ptr_.get(); } @@ -104,6 +103,7 @@ class SSLHTTPDownloader_test : public beast::unit_test::suite (verify ? "Verify" : "No Verify"); using namespace jtx; + ripple::test::detail::FileDirGuard cert { *this, "_cert", "ca.pem", TrustedPublisherServer::ca_cert()}; @@ -152,22 +152,11 @@ class SSLHTTPDownloader_test : public beast::unit_test::suite testFailures() { testcase("Error conditions"); + using namespace jtx; + Env env {*this}; - { - // file exists - Downloader dl {env}; - ripple::test::detail::FileDirGuard const datafile { - *this, "downloads", "data", "file contents"}; - BEAST_EXPECT(!dl->download( - "localhost", - "443", - "", - 11, - datafile.file(), - std::function {std::ref(cb)})); - } { // bad hostname boost::system::error_code ec; diff --git a/src/test/rpc/ShardArchiveHandler_test.cpp b/src/test/rpc/ShardArchiveHandler_test.cpp new file mode 100644 index 000000000..3515810ea --- /dev/null +++ b/src/test/rpc/ShardArchiveHandler_test.cpp @@ -0,0 +1,364 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ripple { +namespace test { + +class ShardArchiveHandler_test : public beast::unit_test::suite +{ + using Downloads = std::vector>; + + TrustedPublisherServer + createServer(jtx::Env& env, bool ssl = true) + { + std::vector list; + list.push_back(TrustedPublisherServer::randomValidator()); + return TrustedPublisherServer{ + env.app().getIOService(), + list, + env.timeKeeper().now() + std::chrono::seconds{3600}, + ssl}; + } + +public: + void + testStateDatabase1() + { + testcase("testStateDatabase1"); + + { + beast::temp_dir tempDir; + + auto c = jtx::envconfig(); + auto& section = c->section(ConfigSection::shardDatabase()); + section.set("path", tempDir.path()); + section.set("max_size_gb", "100"); + c->setupControl(true, true, true); + + jtx::Env env(*this, std::move(c)); + auto handler = RPC::ShardArchiveHandler::getInstance( + env.app(), env.app().getJobQueue()); + BEAST_EXPECT(handler); + + BEAST_EXPECT(handler->init()); + + std::string const rawUrl = "https://foo:443/1.tar.lz4"; + parsedURL url; + + parseUrl(url, rawUrl); + handler->add(1, {url, rawUrl}); + + { + std::lock_guard lock(handler->m_); + + auto& session{handler->sqliteDB_->getSession()}; + + soci::rowset rs = + (session.prepare << "SELECT * FROM State;"); + + uint64_t rowCount = 0; + + for (auto it = rs.begin(); it != rs.end(); ++it, ++rowCount) + { + BEAST_EXPECT(it->get(0) == 1); + BEAST_EXPECT(it->get(1) == rawUrl); + } + + BEAST_EXPECT(rowCount == 1); + } + + handler->release(); + } + + // Destroy the singleton so we start fresh in + // the next testcase. + RPC::ShardArchiveHandler::instance_.reset(); + } + + void + testStateDatabase2() + { + testcase("testStateDatabase2"); + + { + beast::temp_dir tempDir; + + auto c = jtx::envconfig(); + auto& section = c->section(ConfigSection::shardDatabase()); + section.set("path", tempDir.path()); + section.set("max_size_gb", "100"); + c->setupControl(true, true, true); + + jtx::Env env(*this, std::move(c)); + auto handler = RPC::ShardArchiveHandler::getInstance( + env.app(), env.app().getJobQueue()); + BEAST_EXPECT(handler); + + BEAST_EXPECT(handler->init()); + + Downloads const dl = {{1, "https://foo:443/1.tar.lz4"}, + {2, "https://foo:443/2.tar.lz4"}, + {3, "https://foo:443/3.tar.lz4"}}; + + for (auto const& entry : dl) + { + parsedURL url; + parseUrl(url, entry.second); + handler->add(entry.first, {url, entry.second}); + } + + { + std::lock_guard lock(handler->m_); + + auto& session{handler->sqliteDB_->getSession()}; + soci::rowset rs = + (session.prepare << "SELECT * FROM State;"); + + uint64_t pos = 0; + for (auto it = rs.begin(); it != rs.end(); ++it, ++pos) + { + BEAST_EXPECT(it->get(0) == dl[pos].first); + BEAST_EXPECT(it->get(1) == dl[pos].second); + } + + BEAST_EXPECT(pos == dl.size()); + } + + handler->release(); + } + + // Destroy the singleton so we start fresh in + // the next testcase. + RPC::ShardArchiveHandler::instance_.reset(); + } + + void + testStateDatabase3() + { + testcase("testStateDatabase3"); + + { + beast::temp_dir tempDir; + + auto c = jtx::envconfig(); + auto& section = c->section(ConfigSection::shardDatabase()); + section.set("path", tempDir.path()); + section.set("max_size_gb", "100"); + c->setupControl(true, true, true); + + jtx::Env env(*this, std::move(c)); + auto handler = RPC::ShardArchiveHandler::getInstance( + env.app(), env.app().getJobQueue()); + BEAST_EXPECT(handler); + + BEAST_EXPECT(handler->init()); + + auto server = createServer(env); + auto host = server.local_endpoint().address().to_string(); + auto port = std::to_string(server.local_endpoint().port()); + server.stop(); + + Downloads const dl = [&host, &port] { + Downloads ret; + + for (int i = 1; i <= 10; ++i) + { + ret.push_back({i, + (boost::format("https://%s:%d/%d.tar.lz4") % + host % port % i) + .str()}); + } + + return ret; + }(); + + for (auto const& entry : dl) + { + parsedURL url; + parseUrl(url, entry.second); + handler->add(entry.first, {url, entry.second}); + } + + BEAST_EXPECT(handler->start()); + + auto stateDir = RPC::ShardArchiveHandler::getDownloadDirectory( + env.app().config()); + + std::unique_lock lock(handler->m_); + + BEAST_EXPECT( + boost::filesystem::exists(stateDir) || + handler->archives_.empty()); + + while (!handler->archives_.empty()) + { + lock.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + lock.lock(); + } + + BEAST_EXPECT(!boost::filesystem::exists(stateDir)); + } + + // Destroy the singleton so we start fresh in + // the next testcase. + RPC::ShardArchiveHandler::instance_.reset(); + } + + void + testStateDatabase4() + { + testcase("testStateDatabase4"); + + beast::temp_dir tempDir; + + { + auto c = jtx::envconfig(); + auto& section = c->section(ConfigSection::shardDatabase()); + section.set("path", tempDir.path()); + section.set("max_size_gb", "100"); + c->setupControl(true, true, true); + + jtx::Env env(*this, std::move(c)); + auto handler = RPC::ShardArchiveHandler::getInstance( + env.app(), env.app().getJobQueue()); + BEAST_EXPECT(handler); + + BEAST_EXPECT(handler->init()); + + auto server = createServer(env); + auto host = server.local_endpoint().address().to_string(); + auto port = std::to_string(server.local_endpoint().port()); + server.stop(); + + Downloads const dl = [&host, &port] { + Downloads ret; + + for (int i = 1; i <= 10; ++i) + { + ret.push_back({i, + (boost::format("https://%s:%d/%d.tar.lz4") % + host % port % i) + .str()}); + } + + return ret; + }(); + + for (auto const& entry : dl) + { + parsedURL url; + parseUrl(url, entry.second); + handler->add(entry.first, {url, entry.second}); + } + + auto stateDir = RPC::ShardArchiveHandler::getDownloadDirectory( + env.app().config()); + + boost::filesystem::copy_file( + stateDir / stateDBName, + boost::filesystem::path(tempDir.path()) / stateDBName); + + BEAST_EXPECT(handler->start()); + + std::unique_lock lock(handler->m_); + + BEAST_EXPECT( + boost::filesystem::exists(stateDir) || + handler->archives_.empty()); + + while (!handler->archives_.empty()) + { + lock.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + lock.lock(); + } + + BEAST_EXPECT(!boost::filesystem::exists(stateDir)); + + boost::filesystem::create_directory(stateDir); + + boost::filesystem::copy_file( + boost::filesystem::path(tempDir.path()) / stateDBName, + stateDir / stateDBName); + } + + // Destroy the singleton so we start fresh in + // the new scope. + RPC::ShardArchiveHandler::instance_.reset(); + + auto c = jtx::envconfig(); + auto& section = c->section(ConfigSection::shardDatabase()); + section.set("path", tempDir.path()); + section.set("max_size_gb", "100"); + c->setupControl(true, true, true); + + jtx::Env env(*this, std::move(c)); + + while (!RPC::ShardArchiveHandler::hasInstance()) + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + BEAST_EXPECT(RPC::ShardArchiveHandler::hasInstance()); + + auto handler = RPC::ShardArchiveHandler::getInstance(); + + auto stateDir = + RPC::ShardArchiveHandler::getDownloadDirectory(env.app().config()); + + std::unique_lock lock(handler->m_); + + BEAST_EXPECT( + boost::filesystem::exists(stateDir) || handler->archives_.empty()); + + while (!handler->archives_.empty()) + { + lock.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + lock.lock(); + } + + BEAST_EXPECT(!boost::filesystem::exists(stateDir)); + } + + void + run() override + { + testStateDatabase1(); + testStateDatabase2(); + testStateDatabase3(); + testStateDatabase4(); + } +}; + +BEAST_DEFINE_TESTSUITE(ShardArchiveHandler, app, ripple); + +} // namespace test +} // namespace ripple