Make ShardArchiveHandler downloads more resilient:

* Make ShardArchiveHandler a singleton.
* Add state database for ShardArchiveHandler.
* Use temporary database for SSLHTTPDownloader downloads.
* Make ShardArchiveHandler a Stoppable class.
* Automatically resume interrupted downloads at server start.
This commit is contained in:
Devon White
2019-12-03 13:56:19 -05:00
committed by manojsdoshi
parent cc452dfa9b
commit 905a97e0aa
21 changed files with 2308 additions and 209 deletions

View File

@@ -487,6 +487,7 @@ target_sources (rippled PRIVATE
main sources:
subdir: net
#]===============================]
src/ripple/net/impl/DatabaseDownloader.cpp
src/ripple/net/impl/HTTPClient.cpp
src/ripple/net/impl/InfoSub.cpp
src/ripple/net/impl/RPCCall.cpp
@@ -918,6 +919,7 @@ target_sources (rippled PRIVATE
src/test/rpc/RPCOverload_test.cpp
src/test/rpc/RobustTransaction_test.cpp
src/test/rpc/ServerInfo_test.cpp
src/test/rpc/ShardArchiveHandler_test.cpp
src/test/rpc/Status_test.cpp
src/test/rpc/Submit_test.cpp
src/test/rpc/Subscribe_test.cpp

View File

@@ -64,6 +64,7 @@
#include <ripple/rpc/impl/RPCHelpers.h>
#include <ripple/beast/asio/io_latency_probe.h>
#include <ripple/beast/core/LexicalCast.h>
#include <ripple/rpc/ShardArchiveHandler.h>
#include <boost/algorithm/string/predicate.hpp>
#include <ripple/app/main/GRPCServer.h>
@@ -1610,6 +1611,53 @@ bool ApplicationImp::setup()
}
}
if (shardStore_)
{
using namespace boost::filesystem;
auto stateDb(
RPC::ShardArchiveHandler::getDownloadDirectory(*config_)
/ stateDBName);
try
{
if (exists(stateDb) &&
is_regular_file(stateDb) &&
!RPC::ShardArchiveHandler::hasInstance())
{
auto handler = RPC::ShardArchiveHandler::recoverInstance(
*this,
*m_jobQueue);
assert(handler);
if (!handler->initFromDB())
{
JLOG(m_journal.fatal())
<< "Failed to initialize ShardArchiveHandler.";
return false;
}
if (!handler->start())
{
JLOG(m_journal.fatal())
<< "Failed to start ShardArchiveHandler.";
return false;
}
}
}
catch(std::exception const& e)
{
JLOG(m_journal.fatal())
<< "Exception when starting ShardArchiveHandler from "
"state database: " << e.what();
return false;
}
}
return true;
}

View File

@@ -181,6 +181,45 @@ std::array<char const*, 6> WalletDBInit {{
"END TRANSACTION;"
}};
////////////////////////////////////////////////////////////////////////////////
static constexpr auto stateDBName {"state.db"};
static constexpr
std::array<char const*, 2> DownloaderDBPragma
{{
"PRAGMA synchronous=FULL;",
"PRAGMA journal_mode=DELETE;"
}};
static constexpr
std::array<char const*, 3> ShardArchiveHandlerDBInit
{{
"BEGIN TRANSACTION;",
"CREATE TABLE IF NOT EXISTS State ( \
ShardIndex INTEGER PRIMARY KEY, \
URL TEXT \
);",
"END TRANSACTION;"
}};
static constexpr
std::array<char const*, 3> DatabaseBodyDBInit
{{
"BEGIN TRANSACTION;",
"CREATE TABLE IF NOT EXISTS download ( \
Path TEXT, \
Data BLOB, \
Size BIGINT UNSIGNED, \
Part BIGINT UNSIGNED PRIMARY KEY \
);",
"END TRANSACTION;"
}};
} // ripple
#endif

View File

@@ -102,18 +102,17 @@ public:
boost::filesystem::path pPath =
useTempFiles ? "" : (setup.dataDir / DBName);
open(session_, "sqlite", pPath.string());
init(pPath, pragma, initSQL);
}
for (auto const& p : pragma)
template<std::size_t N, std::size_t M>
DatabaseCon(
boost::filesystem::path const& dataDir,
std::string const& DBName,
std::array<char const*, N> const& pragma,
std::array<char const*, M> const& initSQL)
{
soci::statement st = session_.prepare << p;
st.execute(true);
}
for (auto const& sql : initSQL)
{
soci::statement st = session_.prepare << sql;
st.execute(true);
}
init((dataDir / DBName), pragma, initSQL);
}
soci::session& getSession()
@@ -129,6 +128,27 @@ public:
void setupCheckpointing (JobQueue*, Logs&);
private:
template<std::size_t N, std::size_t M>
void
init(boost::filesystem::path const& pPath,
std::array<char const*, N> const& pragma,
std::array<char const*, M> const& initSQL)
{
open(session_, "sqlite", pPath.string());
for (auto const& p : pragma)
{
soci::statement st = session_.prepare << p;
st.execute(true);
}
for (auto const& sql : initSQL)
{
soci::statement st = session_.prepare << sql;
st.execute(true);
}
}
LockedSociSession::mutex lock_;
soci::session session_;

View File

@@ -0,0 +1,170 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_NET_DATABASEBODY_H
#define RIPPLE_NET_DATABASEBODY_H
#include <ripple/core/DatabaseCon.h>
#include <boost/asio/io_service.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/beast/http/message.hpp>
#include <soci/sqlite3/soci-sqlite3.h>
namespace ripple {
struct DatabaseBody
{
// Algorithm for storing buffers when parsing.
class reader;
// The type of the @ref message::body member.
class value_type;
/** Returns the size of the body
@param body The database body to use
*/
static std::uint64_t
size(value_type const& body);
};
class DatabaseBody::value_type
{
// This body container holds a connection to the
// database, and also caches the size when set.
friend class reader;
friend struct DatabaseBody;
// The cached file size
std::uint64_t file_size_ = 0;
boost::filesystem::path path_;
std::unique_ptr<DatabaseCon> conn_;
std::string batch_;
std::shared_ptr<boost::asio::io_service::strand> strand_;
std::mutex m_;
std::condition_variable c_;
uint64_t handler_count_ = 0;
uint64_t part_ = 0;
bool closing_ = false;
public:
/// Destructor
~value_type() = default;
/// Constructor
value_type() = default;
/// Returns `true` if the file is open
bool
is_open() const
{
return bool{conn_};
}
/// Returns the size of the file if open
std::uint64_t
size() const
{
return file_size_;
}
/// Close the file if open
void
close();
/** Open a file at the given path with the specified mode
@param path The utf-8 encoded path to the file
@param mode The file mode to use
@param ec Set to the error, if any occurred
*/
void
open(
boost::filesystem::path path,
Config const& config,
boost::asio::io_service& io_service,
boost::system::error_code& ec);
};
/** Algorithm for storing buffers when parsing.
Objects of this type are created during parsing
to store incoming buffers representing the body.
*/
class DatabaseBody::reader
{
value_type& body_; // The body we are writing to
static const uint32_t FLUSH_SIZE = 50000000;
static const uint8_t MAX_HANDLERS = 3;
static const uint16_t MAX_ROW_SIZE_PAD = 500;
public:
// Constructor.
//
// This is called after the header is parsed and
// indicates that a non-zero sized body may be present.
// `h` holds the received message headers.
// `b` is an instance of `DatabaseBody`.
//
template <bool isRequest, class Fields>
explicit reader(
boost::beast::http::header<isRequest, Fields>& h,
value_type& b);
// Initializer
//
// This is called before the body is parsed and
// gives the reader a chance to do something that might
// need to return an error code. It informs us of
// the payload size (`content_length`) which we can
// optionally use for optimization.
//
void
init(boost::optional<std::uint64_t> const&, boost::system::error_code& ec);
// This function is called one or more times to store
// buffer sequences corresponding to the incoming body.
//
template <class ConstBufferSequence>
std::size_t
put(ConstBufferSequence const& buffers, boost::system::error_code& ec);
void
do_put(std::string data);
// This function is called when writing is complete.
// It is an opportunity to perform any final actions
// which might fail, in order to return an error code.
// Operations that might fail should not be attempted in
// destructors, since an exception thrown from there
// would terminate the program.
//
void
finish(boost::system::error_code& ec);
};
} // namespace ripple
#include <ripple/net/impl/DatabaseBody.ipp>
#endif // RIPPLE_NET_DATABASEBODY_H

View File

@@ -0,0 +1,60 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_NET_DATABASEDOWNLOADER_H
#define RIPPLE_NET_DATABASEDOWNLOADER_H
#include <ripple/net/DatabaseBody.h>
#include <ripple/net/SSLHTTPDownloader.h>
namespace ripple {
class DatabaseDownloader : public SSLHTTPDownloader
{
public:
DatabaseDownloader(
boost::asio::io_service& io_service,
beast::Journal j,
Config const& config);
private:
static const uint8_t MAX_PATH_LEN = std::numeric_limits<uint8_t>::max();
std::shared_ptr<parser>
getParser(
boost::filesystem::path dstPath,
std::function<void(boost::filesystem::path)> complete,
boost::system::error_code& ec) override;
bool
checkPath(boost::filesystem::path const& dstPath) override;
void
closeBody(std::shared_ptr<parser> p) override;
uint64_t
size(std::shared_ptr<parser> p) override;
Config const& config_;
boost::asio::io_service& io_service_;
};
} // namespace ripple
#endif // RIPPLE_NET_DATABASEDOWNLOADER_H

View File

@@ -50,7 +50,8 @@ public:
SSLHTTPDownloader(
boost::asio::io_service& io_service,
beast::Journal j,
Config const& config);
Config const& config,
bool isPaused = false);
bool
download(
@@ -61,13 +62,36 @@ public:
boost::filesystem::path const& dstPath,
std::function<void(boost::filesystem::path)> complete);
void
onStop();
virtual
~SSLHTTPDownloader() = default;
protected:
using parser = boost::beast::http::basic_parser<false>;
beast::Journal const j_;
bool
fail(
boost::filesystem::path dstPath,
std::function<void(boost::filesystem::path)> const& complete,
boost::system::error_code const& ec,
std::string const& errMsg,
std::shared_ptr<parser> parser = nullptr);
private:
HTTPClientSSLContext ssl_ctx_;
boost::asio::io_service::strand strand_;
boost::optional<
boost::asio::ssl::stream<boost::asio::ip::tcp::socket>> stream_;
boost::beast::flat_buffer read_buf_;
beast::Journal const j_;
std::atomic<bool> isStopped_;
bool sessionActive_;
std::mutex m_;
std::condition_variable c_;
void
do_session(
@@ -79,12 +103,25 @@ private:
std::function<void(boost::filesystem::path)> complete,
boost::asio::yield_context yield);
void
fail(
virtual
std::shared_ptr<parser>
getParser(
boost::filesystem::path dstPath,
std::function<void(boost::filesystem::path)> const& complete,
boost::system::error_code const& ec,
std::string const& errMsg);
std::function<void(boost::filesystem::path)> complete,
boost::system::error_code & ec) = 0;
virtual
bool
checkPath(
boost::filesystem::path const& dstPath) = 0;
virtual
void
closeBody(std::shared_ptr<parser> p) = 0;
virtual
uint64_t
size(std::shared_ptr<parser> p) = 0;
};
} // ripple

View File

@@ -0,0 +1,263 @@
# Shard Downloader Process
## Overview
This document describes mechanics of the `SSLHTTPDownloader`, a class that performs the task of downloading shards from remote web servers via
SSL HTTP. The downloader utilizes a strand (`boost::asio::io_service::strand`) to ensure that downloads are never executed concurrently. Hence, if a download is in progress when another download is initiated, the second download will be queued and invoked only when the first download is completed.
## New Features
The downloader has been recently (March 2020) been modified to provide some key features:
- The ability to stop downloads during a graceful shutdown.
- The ability to resume partial downloads after a crash or shutdown.
- <span style="color:gray">*(Deferred) The ability to download from multiple servers to a single file.*</span>
## Classes
Much of the shard downloading process concerns the following classes:
- `SSLHTTPDownloader`
This is a generic class designed for serially executing downloads via HTTP SSL.
- `ShardArchiveHandler`
This class uses the `SSLHTTPDownloader` to fetch shards from remote web servers. Additionally, the archive handler performs sanity checks on the downloaded files and imports the validated files into the local shard store.
The `ShardArchiveHandler` exposes a simple public interface:
```C++
/** Add an archive to be downloaded and imported.
@param shardIndex the index of the shard to be imported.
@param url the location of the archive.
@return `true` if successfully added.
@note Returns false if called while downloading.
*/
bool
add(std::uint32_t shardIndex, std::pair<parsedURL, std::string>&& url);
/** Starts downloading and importing archives. */
bool
start();
```
When a client submits a `download_shard` command via the RPC interface, each of the requested files is registered with the handler via the `add` method. After all the files have been registered, the handler's `start` method is invoked, which in turn creates an instance of the `SSLHTTPDownloader` and begins the first download. When the download is completed, the downloader invokes the handler's `complete` method, which will initiate the download of the next file, or simply return if there are no more downloads to process. When `complete` is invoked with no remaining files to be downloaded, the handler and downloader are not destroyed automatically, but persist for the duration of the application.
Additionally, we leverage a novel class to provide customized parsing for downloaded files:
- `DatabaseBody`
This class will define a custom message body type, allowing an `http::response_parser` to write to a SQLite database rather than to a flat file. This class is discussed in further detail in the Recovery section.
## Execution Concept
This section describes in greater detail how the key features of the downloader are implemented in C++ using the `boost::asio` framework.
##### Member Variables:
The variables shown here are members of the `SSLHTTPDownloader` class and
will be used in the following code examples.
```c++
using boost::asio::ssl::stream;
using boost::asio::ip::tcp::socket;
stream<socket> stream_;
std::condition_variable c_;
std::atomic<bool> isStopped_;
```
### Graceful Shutdowns
##### Thread 1:
A graceful shutdown begins when the `onStop()` method of the `ShardArchiveHandler` is invoked:
```c++
void
ShardArchiveHandler::onStop()
{
std::lock_guard<std::mutex> lock(m_);
if (downloader_)
{
downloader_->onStop();
downloader_.reset();
}
stopped();
}
```
Inside of `SSLHTTPDownloader::onStop()`, if a download is currently in progress, the `isStopped_` member variable is set and the thread waits for the download to stop:
```c++
void
SSLHTTPDownloader::onStop()
{
std::unique_lock lock(m_);
isStopped_ = true;
if(sessionActive_)
{
// Wait for the handler to exit.
c_.wait(lock,
[this]()
{
return !sessionActive_;
});
}
}
```
##### Thread 2:
The graceful shutdown is realized when the thread executing the download polls `isStopped_` after this variable has been set to `true`. Polling only occurs while the file is being downloaded, in between calls to `async_read_some()`. The stop takes effect when the socket is closed and the handler function ( `do_session()` ) is exited.
```c++
void SSLHTTPDownloader::do_session()
{
// (Connection initialization logic)
.
.
.
// (In between calls to async_read_some):
if(isStopped_.load())
{
close(p);
return exit();
}
.
.
.
break;
}
```
### Recovery
Persisting the current state of both the archive handler and the downloader is achieved by leveraging a SQLite database rather than flat files, as the database protects against data corruption that could result from a system crash.
##### ShardArchiveHandler
Although `SSLHTTPDownloader` is a generic class that could be used to download a variety of file types, currently it is used exclusively by the `ShardArchiveHandler` to download shards. In order to provide resilience, the `ShardArchiveHandler` will utilize a SQLite database to preserve its current state whenever there are active, paused, or queued downloads. The `shard_db` section in the configuration file allows users to specify the location of the database to use for this purpose.
###### SQLite Table Format
| Index | URL |
|:-----:|:-----------------------------------:|
| 1 | ht<span />tps://example.com/1.tar.lz4 |
| 2 | ht<span />tps://example.com/2.tar.lz4 |
| 5 | ht<span />tps://example.com/5.tar.lz4 |
##### SSLHTTPDownloader
While the archive handler maintains a list of all partial and queued downloads, the `SSLHTTPDownloader` stores the raw bytes of the file currently being downloaded. The partially downloaded file will be represented as one or more `BLOB` entries in a SQLite database. As the maximum size of a `BLOB` entry is currently limited to roughly 2.1 GB, a 5 GB shard file for instance will occupy three database entries upon completion.
###### SQLite Table Format
Since downloads execute serially by design, the entries in this table always correspond to the content of a single file.
| Bytes | Size | Part |
|:------:|:----------:|:----:|
| 0x... | 2147483647 | 0 |
| 0x... | 2147483647 | 1 |
| 0x... | 705032706 | 2 |
##### Config File Entry
The `download_path` field of the `shard_db` entry will be used to determine where to store the recovery database. If this field is omitted, the `path` field will be used instead.
```dosini
# This is the persistent datastore for shards. It is important for the health
# of the ripple network that rippled operators shard as much as practical.
# NuDB requires SSD storage. Helpful information can be found here
# https://ripple.com/build/history-sharding
[shard_db]
type=NuDB
path=/var/lib/rippled/db/shards/nudb
download_path=/var/lib/rippled/db/shards/
max_size_gb=50
```
##### Resuming Partial Downloads
When resuming downloads after a crash or other interruption, the `SSLHTTPDownloader` will utilize the `range` field of the HTTP header to download only the remainder of the partially downloaded file.
```C++
auto downloaded = getPartialFileSize();
auto total = getTotalFileSize();
http::request<http::file_body> req {http::verb::head,
target,
version};
if (downloaded < total)
{
// If we already download 1000 bytes to the partial file,
// the range header will look like:
// Range: "bytes=1000-"
req.set(http::field::range, "bytes=" + to_string(downloaded) + "-");
}
else if(downloaded == total)
{
// Download is already complete. (Interruption Must
// have occurred after file was downloaded but before
// the state file was updated.)
}
else
{
// The size of the partially downloaded file exceeds
// the total download size. Error condition. Handle
// appropriately.
}
```
##### DatabaseBody
Previously, the `SSLHTTPDownloader` leveraged an `http::response_parser` instantiated with an `http::file_body`. The `file_body` class declares a nested type, `reader`, which does the task of writing HTTP message payloads (constituting a requested file) to the filesystem. In order for the `http::response_parser` to interface with the database, we implement a custom body type that declares a nested `reader` type which has been outfitted to persist octects received from the remote host to a local SQLite database. The code snippet below illustrates the customization points available to user-defined body types:
```C++
/// Defines a Body type
struct body
{
/// This determines the return type of the `message::body` member function
using value_type = ...;
/// An optional function, returns the body's payload size (which may be zero)
static
std::uint64_t
size(value_type const& v);
/// The algorithm used for extracting buffers
class reader;
/// The algorithm used for inserting buffers
class writer;
}
```
The method invoked to write data to the filesystem (or SQLite database in our case) has the following signature:
```C++
std::size_t
body::reader::put(ConstBufferSequence const& buffers, error_code& ec);
```
## Sequence Diagram
This sequence diagram demonstrates a scenario wherein the `ShardArchiveHandler` leverages the state persisted in the database to recover from a crash and resume the scheduled downloads.
![alt_text](./images/interrupt_sequence.png "Resuming downloads post abort")
## State Diagram
This diagram illustrates the various states of the Shard Downloader module.
![alt_text](./images/states.png "Shard Downloader states")

Binary file not shown.

After

Width:  |  Height:  |  Size: 197 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 117 KiB

View File

@@ -0,0 +1,312 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
namespace ripple {
inline void
DatabaseBody::value_type::close()
{
{
std::unique_lock lock(m_);
// Stop all scheduled and currently
// executing handlers before closing.
if (handler_count_)
{
closing_ = true;
auto predicate = [&] { return !handler_count_; };
c_.wait(lock, predicate);
}
conn_.reset();
}
}
inline void
DatabaseBody::value_type::open(
boost::filesystem::path path,
Config const& config,
boost::asio::io_service& io_service,
boost::system::error_code& ec)
{
strand_.reset(new boost::asio::io_service::strand(io_service));
auto setup = setup_DatabaseCon(config);
setup.dataDir = path.parent_path();
conn_ = std::make_unique<DatabaseCon>(
setup, "Download", DownloaderDBPragma, DatabaseBodyDBInit);
path_ = path;
auto db = conn_->checkoutDb();
boost::optional<std::string> pathFromDb;
*db << "SELECT Path FROM Download WHERE Part=0;", soci::into(pathFromDb);
// Try to reuse preexisting
// database.
if (pathFromDb)
{
// Can't resuse - database was
// from a different file download.
if (pathFromDb != path.string())
{
*db << "DROP TABLE Download;";
}
// Continuing a file download.
else
{
boost::optional<uint64_t> size;
*db << "SELECT SUM(LENGTH(Data)) FROM Download;", soci::into(size);
if (size)
file_size_ = size.get();
}
}
}
// This is called from message::payload_size
inline std::uint64_t
DatabaseBody::size(value_type const& body)
{
// Forward the call to the body
return body.size();
}
// We don't do much in the reader constructor since the
// database is already open.
//
template <bool isRequest, class Fields>
DatabaseBody::reader::reader(
boost::beast::http::header<isRequest, Fields>&,
value_type& body)
: body_(body)
{
}
// We don't do anything with content_length but a sophisticated
// application might check available space on the device
// to see if there is enough room to store the body.
inline void
DatabaseBody::reader::init(
boost::optional<std::uint64_t> const& /*content_length*/,
boost::system::error_code& ec)
{
// The connection must already be available for writing
assert(body_.conn_);
// The error_code specification requires that we
// either set the error to some value, or set it
// to indicate no error.
//
// We don't do anything fancy so set "no error"
ec = {};
}
// This will get called one or more times with body buffers
//
template <class ConstBufferSequence>
std::size_t
DatabaseBody::reader::put(
ConstBufferSequence const& buffers,
boost::system::error_code& ec)
{
// This function must return the total number of
// bytes transferred from the input buffers.
std::size_t nwritten = 0;
// Loop over all the buffers in the sequence,
// and write each one to the database.
for (auto it = buffer_sequence_begin(buffers);
it != buffer_sequence_end(buffers);
++it)
{
boost::asio::const_buffer buffer = *it;
body_.batch_.append(
static_cast<const char*>(buffer.data()), buffer.size());
// Write this buffer to the database
if (body_.batch_.size() > FLUSH_SIZE)
{
bool post = true;
{
std::lock_guard lock(body_.m_);
if (body_.handler_count_ >= MAX_HANDLERS)
post = false;
else
++body_.handler_count_;
}
if (post)
{
body_.strand_->post(
[data = body_.batch_, this] { this->do_put(data); });
body_.batch_.clear();
}
}
nwritten += it->size();
}
// Indicate success
// This is required by the error_code specification
ec = {};
return nwritten;
}
inline void
DatabaseBody::reader::do_put(std::string data)
{
using namespace boost::asio;
{
std::unique_lock lock(body_.m_);
// The download is being halted.
if (body_.closing_)
{
if (--body_.handler_count_ == 0)
{
lock.unlock();
body_.c_.notify_one();
}
return;
}
}
auto path = body_.path_.string();
uint64_t rowSize;
soci::indicator rti;
uint64_t remainingInRow;
auto db = body_.conn_->checkoutDb();
auto be = dynamic_cast<soci::sqlite3_session_backend*>(db->get_backend());
BOOST_ASSERT(be);
// This limits how large we can make the blob
// in each row. Also subtract a pad value to
// account for the other values in the row.
auto const blobMaxSize =
sqlite_api::sqlite3_limit(be->conn_, SQLITE_LIMIT_LENGTH, -1) -
MAX_ROW_SIZE_PAD;
auto rowInit = [&] {
*db << "INSERT INTO Download VALUES (:path, zeroblob(0), 0, :part)",
soci::use(path), soci::use(body_.part_);
remainingInRow = blobMaxSize;
rowSize = 0;
};
*db << "SELECT Path,Size,Part FROM Download ORDER BY Part DESC "
"LIMIT 1",
soci::into(path), soci::into(rowSize), soci::into(body_.part_, rti);
if (!db->got_data())
rowInit();
else
remainingInRow = blobMaxSize - rowSize;
auto insert = [&db, &rowSize, &part = body_.part_, &fs = body_.file_size_](
auto const& data) {
uint64_t updatedSize = rowSize + data.size();
*db << "UPDATE Download SET Data = CAST(Data || :data AS blob), "
"Size = :size WHERE Part = :part;",
soci::use(data), soci::use(updatedSize), soci::use(part);
fs += data.size();
};
while (remainingInRow < data.size())
{
if (remainingInRow)
{
insert(data.substr(0, remainingInRow));
data.erase(0, remainingInRow);
}
++body_.part_;
rowInit();
}
insert(data);
bool const notify = [this] {
std::lock_guard lock(body_.m_);
return --body_.handler_count_ == 0;
}();
if (notify)
body_.c_.notify_one();
}
// Called after writing is done when there's no error.
inline void
DatabaseBody::reader::finish(boost::system::error_code& ec)
{
{
std::unique_lock lock(body_.m_);
// Wait for scheduled DB writes
// to complete.
if (body_.handler_count_)
{
auto predicate = [&] { return !body_.handler_count_; };
body_.c_.wait(lock, predicate);
}
}
auto db = body_.conn_->checkoutDb();
soci::rowset<std::string> rs =
(db->prepare << "SELECT Data FROM Download ORDER BY PART ASC;");
std::ofstream fout;
fout.open(body_.path_.string(), std::ios::binary | std::ios::out);
// iteration through the resultset:
for (auto it = rs.begin(); it != rs.end(); ++it)
fout.write(it->data(), it->size());
// Flush any pending data that hasn't
// been been written to the DB.
if (body_.batch_.size())
{
fout.write(body_.batch_.data(), body_.batch_.size());
body_.batch_.clear();
}
fout.close();
}
} // namespace ripple

View File

@@ -0,0 +1,88 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/net/DatabaseDownloader.h>
namespace ripple
{
DatabaseDownloader::DatabaseDownloader(
boost::asio::io_service & io_service,
beast::Journal j,
Config const & config)
: SSLHTTPDownloader(io_service, j, config)
, config_(config)
, io_service_(io_service)
{
}
auto
DatabaseDownloader::getParser(boost::filesystem::path dstPath,
std::function<void(boost::filesystem::path)> complete,
boost::system::error_code & ec) -> std::shared_ptr<parser>
{
using namespace boost::beast;
auto p = std::make_shared<http::response_parser<DatabaseBody >>();
p->body_limit(std::numeric_limits<std::uint64_t>::max());
p->get().body().open(
dstPath,
config_,
io_service_,
ec);
if(ec)
{
p->get().body().close();
fail(dstPath, complete, ec, "open");
}
return p;
}
bool
DatabaseDownloader::checkPath(boost::filesystem::path const & dstPath)
{
return dstPath.string().size() <= MAX_PATH_LEN;
}
void
DatabaseDownloader::closeBody(std::shared_ptr<parser> p)
{
using namespace boost::beast;
auto databaseBodyParser = std::dynamic_pointer_cast<
http::response_parser<DatabaseBody >>(p);
assert(databaseBodyParser);
databaseBodyParser->get().body().close();
}
uint64_t
DatabaseDownloader::size(std::shared_ptr<parser> p)
{
using namespace boost::beast;
auto databaseBodyParser = std::dynamic_pointer_cast<
http::response_parser<DatabaseBody >>(p);
assert(databaseBodyParser);
return databaseBodyParser->get().body().size();
}
} // ripple

View File

@@ -25,10 +25,13 @@ namespace ripple {
SSLHTTPDownloader::SSLHTTPDownloader(
boost::asio::io_service& io_service,
beast::Journal j,
Config const& config)
: ssl_ctx_(config, j, boost::asio::ssl::context::tlsv12_client)
Config const& config,
bool isPaused)
: j_(j)
, ssl_ctx_(config, j, boost::asio::ssl::context::tlsv12_client)
, strand_(io_service)
, j_(j)
, isStopped_(false)
, sessionActive_(false)
{
}
@@ -41,21 +44,8 @@ SSLHTTPDownloader::download(
boost::filesystem::path const& dstPath,
std::function<void(boost::filesystem::path)> complete)
{
try
{
if (exists(dstPath))
{
JLOG(j_.error()) <<
"Destination file exists";
if (!checkPath(dstPath))
return false;
}
}
catch (std::exception const& e)
{
JLOG(j_.error()) <<
"exception: " << e.what();
return false;
}
if (!strand_.running_in_this_thread())
strand_.post(
@@ -84,6 +74,24 @@ SSLHTTPDownloader::download(
return true;
}
void
SSLHTTPDownloader::onStop()
{
std::unique_lock lock(m_);
isStopped_ = true;
if(sessionActive_)
{
// Wait for the handler to exit.
c_.wait(lock,
[this]()
{
return !sessionActive_;
});
}
}
void
SSLHTTPDownloader::do_session(
std::string const host,
@@ -98,10 +106,19 @@ SSLHTTPDownloader::do_session(
using namespace boost::beast;
boost::system::error_code ec;
bool skip = false;
//////////////////////////////////////////////
// Define lambdas for encapsulating download
// operations:
auto connect = [&](std::shared_ptr<parser> parser)
{
uint64_t const rangeStart = size(parser);
ip::tcp::resolver resolver {strand_.context()};
auto const results = resolver.async_resolve(host, port, yield[ec]);
if (ec)
return fail(dstPath, complete, ec, "async_resolve");
return fail(dstPath, complete, ec, "async_resolve", parser);
try
{
@@ -110,34 +127,41 @@ SSLHTTPDownloader::do_session(
catch (std::exception const& e)
{
return fail(dstPath, complete, ec,
std::string("exception: ") + e.what());
std::string("exception: ") + e.what(), parser);
}
ec = ssl_ctx_.preConnectVerify(*stream_, host);
if (ec)
return fail(dstPath, complete, ec, "preConnectVerify");
return fail(dstPath, complete, ec, "preConnectVerify", parser);
boost::asio::async_connect(
stream_->next_layer(), results.begin(), results.end(), yield[ec]);
if (ec)
return fail(dstPath, complete, ec, "async_connect");
return fail(dstPath, complete, ec, "async_connect", parser);
ec = ssl_ctx_.postConnectVerify(*stream_, host);
if (ec)
return fail(dstPath, complete, ec, "postConnectVerify");
return fail(dstPath, complete, ec, "postConnectVerify", parser);
stream_->async_handshake(ssl::stream_base::client, yield[ec]);
if (ec)
return fail(dstPath, complete, ec, "async_handshake");
return fail(dstPath, complete, ec, "async_handshake", parser);
// Set up an HTTP HEAD request message to find the file size
http::request<http::empty_body> req {http::verb::head, target, version};
req.set(http::field::host, host);
req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
// Requesting a portion of the file
if (rangeStart)
{
req.set(http::field::range,
(boost::format("bytes=%llu-") % rangeStart).str());
}
http::async_write(*stream_, req, yield[ec]);
if(ec)
return fail(dstPath, complete, ec, "async_write");
return fail(dstPath, complete, ec, "async_write", parser);
{
// Check if available storage for file size
@@ -145,51 +169,82 @@ SSLHTTPDownloader::do_session(
p.skip(true);
http::async_read(*stream_, read_buf_, p, yield[ec]);
if(ec)
return fail(dstPath, complete, ec, "async_read");
if (auto len = p.content_length())
return fail(dstPath, complete, ec, "async_read", parser);
// Range request was rejected
if(p.get().result() == http::status::range_not_satisfiable)
{
req.erase(http::field::range);
http::async_write(*stream_, req, yield[ec]);
if(ec)
return fail(dstPath, complete, ec,
"async_write_range_verify", parser);
http::response_parser<http::empty_body> p;
p.skip(true);
http::async_read(*stream_, read_buf_, p, yield[ec]);
if(ec)
return fail(dstPath, complete, ec,
"async_read_range_verify", parser);
// The entire file is downloaded already.
if(p.content_length() == rangeStart)
skip = true;
else
return fail(dstPath, complete, ec,
"range_not_satisfiable", parser);
}
else if (rangeStart &&
p.get().result() != http::status::partial_content)
{
ec.assign(boost::system::errc::not_supported,
boost::system::generic_category());
return fail(dstPath, complete, ec,
"Range request ignored", parser);
}
else if (auto len = p.content_length())
{
try
{
if (*len > space(dstPath.parent_path()).available)
{
return fail(dstPath, complete, ec,
"Insufficient disk space for download");
"Insufficient disk space for download", parser);
}
}
catch (std::exception const& e)
{
return fail(dstPath, complete, ec,
std::string("exception: ") + e.what());
std::string("exception: ") + e.what(), parser);
}
}
}
if(!skip)
{
// Set up an HTTP GET request message to download the file
req.method(http::verb::get);
if (rangeStart)
{
req.set(http::field::range,
(boost::format("bytes=%llu-") % rangeStart).str());
}
}
http::async_write(*stream_, req, yield[ec]);
if(ec)
return fail(dstPath, complete, ec, "async_write");
return fail(dstPath, complete, ec, "async_write", parser);
// Download the file
http::response_parser<http::file_body> p;
p.body_limit(std::numeric_limits<std::uint64_t>::max());
p.get().body().open(
dstPath.string().c_str(),
boost::beast::file_mode::write,
ec);
if (ec)
{
p.get().body().close();
return fail(dstPath, complete, ec, "open");
}
return true;
};
http::async_read(*stream_, read_buf_, p, yield[ec]);
if (ec)
auto close = [&](auto p)
{
p.get().body().close();
return fail(dstPath, complete, ec, "async_read");
}
p.get().body().close();
closeBody(p);
// Gracefully close the stream
stream_->async_shutdown(yield[ec]);
@@ -204,20 +259,78 @@ SSLHTTPDownloader::do_session(
}
// The socket cannot be reused
stream_ = boost::none;
};
auto getParser = [&]
{
auto p = this->getParser(dstPath, complete, ec);
if (ec)
fail(dstPath, complete, ec, "getParser", p);
return p;
};
// When the downloader is being stopped
// because the server is shutting down,
// this method notifies a 'Stoppable'
// object that the session has ended.
auto exit = [this]()
{
std::lock_guard<std::mutex> lock(m_);
sessionActive_ = false;
c_.notify_one();
};
// end lambdas
////////////////////////////////////////////////////////////
{
std::lock_guard<std::mutex> lock(m_);
sessionActive_ = true;
}
if(isStopped_.load())
return exit();
auto p = getParser();
if (ec)
return exit();
if (!connect(p) || ec)
return exit();
if(skip)
p->skip(true);
// Download the file
while (!p->is_done())
{
if(isStopped_.load())
{
close(p);
return exit();
}
http::async_read_some(*stream_, read_buf_, *p, yield[ec]);
}
JLOG(j_.trace()) <<
"download completed: " << dstPath.string();
close(p);
exit();
// Notify the completion handler
complete(std::move(dstPath));
}
void
bool
SSLHTTPDownloader::fail(
boost::filesystem::path dstPath,
std::function<void(boost::filesystem::path)> const& complete,
boost::system::error_code const& ec,
std::string const& errMsg)
std::string const& errMsg,
std::shared_ptr<parser> parser)
{
if (!ec)
{
@@ -230,18 +343,21 @@ SSLHTTPDownloader::fail(
errMsg << ": " << ec.message();
}
if (parser)
closeBody(parser);
try
{
remove(dstPath);
}
catch (std::exception const& e)
{
JLOG(j_.error()) <<
"exception: " << e.what();
JLOG(j_.error()) << "exception: " << e.what()
<< " in function: " << __func__;
}
complete(std::move(dstPath));
return false;
}
}// ripple

View File

@@ -0,0 +1,233 @@
@startuml
skinparam shadowing false
/'
skinparam sequence {
ArrowColor #e1e4e8
ActorBorderColor #e1e4e8
DatabaseBorderColor #e1e4e8
LifeLineBorderColor Black
LifeLineBackgroundColor #d3d6d9
ParticipantBorderColor DeepSkyBlue
ParticipantBackgroundColor DodgerBlue
ParticipantFontName Impact
ParticipantFontSize 17
ParticipantFontColor #A9DCDF
NoteBackgroundColor #6a737d
ActorBackgroundColor #f6f8fa
ActorFontColor #6a737d
ActorFontSize 17
ActorFontName Aapex
EntityBackgroundColor #f6f8fa
EntityFontColor #6a737d
EntityFontSize 17
EntityFontName Aapex
DatabaseBackgroundColor #f6f8fa
DatabaseFontColor #6a737d
DatabaseFontSize 17
DatabaseFontName Aapex
CollectionsBackgroundColor #f6f8fa
ActorFontColor #6a737d
ActorFontSize 17
ActorFontName Aapex
}
skinparam note {
BackgroundColor #fafbfc
BorderColor #e1e4e8
}
'/
'skinparam monochrome true
actor Client as c
entity RippleNode as rn
entity ShardArchiveHandler as sa
entity SSLHTTPDownloader as d
database Database as db
collections Fileserver as s
c -> rn: Launch RippleNode
activate rn
c -> rn: Issue download request
note right of c
**Download Request:**
{
"method": "download_shard",
"params":
[
{
"shards":
[
{"index": 1, "url": "https://example.com/1.tar.lz4"},
{"index": 2, "url": "https://example.com/2.tar.lz4"},
{"index": 5, "url": "https://example.com/5.tar.lz4"}
]
}
]
}
end note
rn -> sa: Create instance of Handler
activate sa
rn -> sa: Add three downloads
sa -> sa: Validate requested downloads
rn -> sa: Initiate Downloads
sa -> rn: ACK: Initiating
rn -> c: Initiating requested downloads
sa -> db: Save state to the database\n(Processing three downloads)
note right of db
**ArchiveHandler State (SQLite Table):**
| Index | URL |
| 1 | https://example.com/1.tar.lz4 |
| 2 | https://example.com/2.tar.lz4 |
| 5 | https://example.com/5.tar.lz4 |
end note
sa -> d: Create instance of Downloader
activate d
group Download 1
note over sa
**Download 1:**
This encapsulates the download of the first file
at URL "https://example.com/1.tar.lz4".
end note
sa -> d: Start download
d -> s: Connect and request file
s -> d: Send file
d -> sa: Invoke completion handler
end
sa -> sa: Import and validate shard
sa -> db: Update persisted state\n(Remove download)
note right of db
**ArchiveHandler State:**
| Index | URL |
| 2 | https://example.com/2.tar.lz4 |
| 5 | https://example.com/5.tar.lz4 |
end note
group Download 2
sa -> d: Start download
d -> s: Connect and request file
end
rn -> rn: **RippleNode crashes**
deactivate sa
deactivate rn
deactivate d
c -> rn: Restart RippleNode
activate rn
rn -> db: Detect non-empty state database
rn -> sa: Create instance of Handler
activate sa
sa -> db: Load state
note right of db
**ArchiveHandler State:**
| Index | URL |
| 2 | https://example.com/2.tar.lz4 |
| 5 | https://example.com/5.tar.lz4 |
end note
sa -> d: Create instance of Downloader
activate d
sa -> sa: Resume Download 2
group Download 2
sa -> d: Start download
d -> s: Connect and request file
s -> d: Send file
d -> sa: Invoke completion handler
end
sa -> sa: Import and validate shard
sa -> db: Update persisted state \n(Remove download)
note right of db
**ArchiveHandler State:**
| Index | URL |
| 5 | https://example.com/5.tar.lz4 |
end note
group Download 3
sa -> d: Start download
d -> s: Connect and request file
s -> d: Send file
d -> sa: Invoke completion handler
end
sa -> sa: Import and validate shard
sa -> db: Update persisted state \n(Remove download)
note right of db
**ArchiveHandler State:**
***empty***
end note
sa -> db: Remove empty database
sa -> sa: Automatically destroyed
deactivate sa
d -> d: Destroyed via reference\ncounting
deactivate d
c -> rn: Poll RippleNode to verify successfull\nimport of all requested shards.
c -> rn: Shutdown RippleNode
deactivate rn
@enduml

View File

@@ -0,0 +1,69 @@
@startuml
state "Updating Database" as UD4 {
UD4: Update the database to reflect
UD4: the current state.
}
state "Initiating Download" as ID {
ID: Omit the range header to download
ID: the entire file.
}
state "Evaluate Database" as ED {
ED: Determine the current state
ED: based on the contents of the
ED: database from a previous run.
}
state "Remove Database" as RD {
RD: The database is destroyed when
RD: empty.
}
state "Download in Progress" as DP
state "Download Completed" as DC {
state "Updating Database" as UD {
UD: Update the database to reflect
UD: the current state.
}
state "Queue Check" as QC {
QC: Check the queue for any reamining
QC: downloads.
}
[*] --> UD
UD --> QC
}
state "Check Resume" as CR {
CR: Determine whether we're resuming
CR: a previous download or starting a
CR: new one.
}
state "Resuming Download" as IPD {
IPD: Set the range header in the
IPD: HTTP request as needed.
}
[*] --> ED : State DB is present at\nnode launch
ED --> RD : State DB is empty
ED --> CR : There are downloads queued
RD --> [*]
[*] --> UD4 : Client invokes <<download_shard>>\ncommand
UD4 --> ID : Database updated
ID --> DP : Download started
DP --> DC : Download completed
DC --> ID : There **are** additional downloads\nqueued
DP --> [*] : A graceful shutdown is\nin progress
DC --> RD : There **are no** additional\ndownloads queued
CR --> IPD : Resuming an interrupted\ndownload
IPD --> DP: Download started
CR --> ID : Initiating a new\ndownload
@enduml

View File

@@ -23,27 +23,76 @@
#include <ripple/app/main/Application.h>
#include <ripple/basics/BasicConfig.h>
#include <ripple/basics/StringUtilities.h>
#include <ripple/net/SSLHTTPDownloader.h>
#include <ripple/net/DatabaseDownloader.h>
#include <boost/asio/basic_waitable_timer.hpp>
#include <boost/filesystem.hpp>
namespace ripple {
namespace test { class ShardArchiveHandler_test; }
namespace RPC {
/** Handles the download and import one or more shard archives. */
class ShardArchiveHandler
: public std::enable_shared_from_this <ShardArchiveHandler>
: public Stoppable
, public std::enable_shared_from_this <ShardArchiveHandler>
{
public:
using pointer = std::shared_ptr<ShardArchiveHandler>;
friend class test::ShardArchiveHandler_test;
static
boost::filesystem::path
getDownloadDirectory(Config const& config);
static
pointer
getInstance();
static
pointer
getInstance(Application& app, Stoppable& parent);
static
pointer
recoverInstance(Application& app, Stoppable& parent);
static
bool
hasInstance();
bool
init();
bool
initFromDB();
~ShardArchiveHandler() = default;
bool
add(std::uint32_t shardIndex, std::pair<parsedURL, std::string>&& url);
/** Starts downloading and importing archives. */
bool
start();
void
release();
private:
ShardArchiveHandler() = delete;
ShardArchiveHandler(ShardArchiveHandler const&) = delete;
ShardArchiveHandler& operator= (ShardArchiveHandler&&) = delete;
ShardArchiveHandler& operator= (ShardArchiveHandler const&) = delete;
ShardArchiveHandler(Application& app);
ShardArchiveHandler(
Application& app,
Stoppable& parent,
bool recovery = false);
~ShardArchiveHandler();
void onStop () override;
/** Add an archive to be downloaded and imported.
@param shardIndex the index of the shard to be imported.
@@ -52,13 +101,9 @@ public:
@note Returns false if called while downloading.
*/
bool
add(std::uint32_t shardIndex, parsedURL&& url);
add(std::uint32_t shardIndex, parsedURL&& url,
std::lock_guard<std::mutex> const&);
/** Starts downloading and importing archives. */
bool
start();
private:
// Begins the download and import of the next archive.
bool
next(std::lock_guard<std::mutex>& l);
@@ -75,14 +120,21 @@ private:
void
remove(std::lock_guard<std::mutex>&);
void
doRelease(std::lock_guard<std::mutex> const&);
static std::mutex instance_mutex_;
static pointer instance_;
std::mutex mutable m_;
Application& app_;
std::shared_ptr<SSLHTTPDownloader> downloader_;
beast::Journal const j_;
std::unique_ptr<DatabaseCon> sqliteDB_;
std::shared_ptr<DatabaseDownloader> downloader_;
boost::filesystem::path const downloadDir_;
boost::asio::basic_waitable_timer<std::chrono::steady_clock> timer_;
bool process_;
std::map<std::uint32_t, parsedURL> archives_;
beast::Journal const j_;
};
} // RPC

View File

@@ -77,7 +77,7 @@ doDownloadShard(RPC::JsonContext& context)
// Validate shards
static const std::string ext {".tar.lz4"};
std::map<std::uint32_t, parsedURL> archives;
std::map<std::uint32_t, std::pair<parsedURL, std::string>> archives;
for (auto& it : context.params[jss::shards])
{
// Validate the index
@@ -94,7 +94,8 @@ doDownloadShard(RPC::JsonContext& context)
if (!it.isMember(jss::url))
return RPC::missing_field_error(jss::url);
parsedURL url;
if (!parseUrl(url, it[jss::url].asString()) ||
auto unparsedURL = it[jss::url].asString();
if (!parseUrl(url, unparsedURL) ||
url.domain.empty() || url.path.empty())
{
return RPC::invalid_field_error(jss::url);
@@ -116,16 +117,39 @@ doDownloadShard(RPC::JsonContext& context)
}
// Check for duplicate indexes
if (!archives.emplace(jv.asUInt(), std::move(url)).second)
if (!archives.emplace(jv.asUInt(),
std::make_pair(std::move(url), unparsedURL)).second)
{
return RPC::make_param_error("Invalid field '" +
std::string(jss::index) + "', duplicate shard ids.");
}
}
// Begin downloading. The handler keeps itself alive while downloading.
auto handler {
std::make_shared<RPC::ShardArchiveHandler>(context.app)};
RPC::ShardArchiveHandler::pointer handler;
try
{
handler = RPC::ShardArchiveHandler::hasInstance() ?
RPC::ShardArchiveHandler::getInstance() :
RPC::ShardArchiveHandler::getInstance(
context.app,
context.app.getJobQueue());
if(!handler)
return RPC::make_error (rpcINTERNAL,
"Failed to create ShardArchiveHandler.");
if(!handler->init())
return RPC::make_error (rpcINTERNAL,
"Failed to initiate ShardArchiveHandler.");
}
catch (std::exception const& e)
{
return RPC::make_error (rpcINTERNAL,
std::string("Failed to start download: ") +
e.what());
}
for (auto& [index, url] : archives)
{
if (!handler->add(index, std::move(url)))
@@ -135,8 +159,13 @@ doDownloadShard(RPC::JsonContext& context)
std::to_string(index) + " exists or being acquired");
}
}
// Begin downloading.
if (!handler->start())
{
handler->release();
return rpcError(rpcINTERNAL);
}
std::string s {"Downloading shard"};
preShards = shardStore->getPreShards();

View File

@@ -61,10 +61,12 @@ Json::Value doManifest (RPC::JsonContext&);
Json::Value doNoRippleCheck (RPC::JsonContext&);
Json::Value doOwnerInfo (RPC::JsonContext&);
Json::Value doPathFind (RPC::JsonContext&);
Json::Value doPause (RPC::JsonContext&);
Json::Value doPeers (RPC::JsonContext&);
Json::Value doPing (RPC::JsonContext&);
Json::Value doPrint (RPC::JsonContext&);
Json::Value doRandom (RPC::JsonContext&);
Json::Value doResume (RPC::JsonContext&);
Json::Value doPeerReservationsAdd (RPC::JsonContext&);
Json::Value doPeerReservationsDel (RPC::JsonContext&);
Json::Value doPeerReservationsList (RPC::JsonContext&);

View File

@@ -22,6 +22,9 @@
#include <ripple/core/ConfigSections.h>
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/rpc/ShardArchiveHandler.h>
#include <ripple/basics/BasicConfig.h>
#include <ripple/rpc/impl/Handler.h>
#include <ripple/protocol/ErrorCodes.h>
#include <memory>
@@ -31,41 +34,192 @@ namespace RPC {
using namespace boost::filesystem;
using namespace std::chrono_literals;
ShardArchiveHandler::ShardArchiveHandler(Application& app)
: app_(app)
, downloadDir_(get(app_.config().section(
ConfigSection::shardDatabase()), "path", "") + "/download")
, timer_(app_.getIOService())
, process_(false)
, j_(app.journal("ShardArchiveHandler"))
std::mutex ShardArchiveHandler::instance_mutex_;
ShardArchiveHandler::pointer ShardArchiveHandler::instance_ = nullptr;
boost::filesystem::path
ShardArchiveHandler::getDownloadDirectory(Config const& config)
{
assert(app_.getShardStore());
return get(config.section(
ConfigSection::shardDatabase()), "download_path",
get(config.section(ConfigSection::shardDatabase()),
"path", "")) / "download";
}
ShardArchiveHandler::~ShardArchiveHandler()
auto
ShardArchiveHandler::getInstance() -> pointer
{
std::lock_guard lock(m_);
timer_.cancel();
for (auto const& ar : archives_)
app_.getShardStore()->removePreShard(ar.first);
archives_.clear();
std::lock_guard lock(instance_mutex_);
// Remove temp root download directory
try
{
remove_all(downloadDir_);
return instance_;
}
catch (std::exception const& e)
auto
ShardArchiveHandler::getInstance(Application& app,
Stoppable& parent) -> pointer
{
JLOG(j_.error()) <<
"exception: " << e.what();
std::lock_guard lock(instance_mutex_);
assert(!instance_);
instance_.reset(new ShardArchiveHandler(app, parent));
return instance_;
}
auto
ShardArchiveHandler::recoverInstance(Application& app, Stoppable& parent) -> pointer
{
std::lock_guard lock(instance_mutex_);
assert(!instance_);
instance_.reset(new ShardArchiveHandler(app, parent, true));
return instance_;
}
bool
ShardArchiveHandler::add(std::uint32_t shardIndex, parsedURL&& url)
ShardArchiveHandler::hasInstance()
{
std::lock_guard lock(instance_mutex_);
return instance_.get() != nullptr;
}
ShardArchiveHandler::ShardArchiveHandler(
Application& app,
Stoppable& parent,
bool recovery)
: Stoppable("ShardArchiveHandler", parent)
, app_(app)
, j_(app.journal("ShardArchiveHandler"))
, downloadDir_(getDownloadDirectory(app.config()))
, timer_(app_.getIOService())
, process_(false)
{
assert(app_.getShardStore());
if(recovery)
downloader_.reset(new DatabaseDownloader (
app_.getIOService(), j_, app_.config()));
}
bool
ShardArchiveHandler::init()
{
try
{
create_directories(downloadDir_);
sqliteDB_ = std::make_unique<DatabaseCon>(
downloadDir_ ,
stateDBName,
DownloaderDBPragma,
ShardArchiveHandlerDBInit);
}
catch(std::exception const& e)
{
JLOG(j_.error()) << "exception: " << e.what()
<< " in function: " << __func__;
return false;
}
return true;
}
bool
ShardArchiveHandler::initFromDB()
{
try
{
using namespace boost::filesystem;
assert(exists(downloadDir_ / stateDBName) &&
is_regular_file(downloadDir_ / stateDBName));
sqliteDB_ = std::make_unique<DatabaseCon>(
downloadDir_,
stateDBName,
DownloaderDBPragma,
ShardArchiveHandlerDBInit);
auto& session{sqliteDB_->getSession()};
soci::rowset<soci::row> rs = (session.prepare
<< "SELECT * FROM State;");
std::lock_guard<std::mutex> lock(m_);
for (auto it = rs.begin(); it != rs.end(); ++it)
{
parsedURL url;
if (!parseUrl(url, it->get<std::string>(1)))
{
JLOG(j_.error()) << "Failed to parse url: "
<< it->get<std::string>(1);
continue;
}
add(it->get<int>(0), std::move(url), lock);
}
// Failed to load anything
// from the state database.
if(archives_.empty())
{
release();
return false;
}
}
catch(std::exception const& e)
{
JLOG(j_.error()) << "exception: " << e.what()
<< " in function: " << __func__;
return false;
}
return true;
}
void
ShardArchiveHandler::onStop()
{
std::lock_guard<std::mutex> lock(m_);
if (downloader_)
{
downloader_->onStop();
downloader_.reset();
}
stopped();
}
bool
ShardArchiveHandler::add(std::uint32_t shardIndex,
std::pair<parsedURL, std::string>&& url)
{
std::lock_guard<std::mutex> lock(m_);
if (!add(shardIndex, std::forward<parsedURL>(url.first), lock))
return false;
auto& session{sqliteDB_->getSession()};
session << "INSERT INTO State VALUES (:index, :url);",
soci::use(shardIndex),
soci::use(url.second);
return true;
}
bool
ShardArchiveHandler::add(std::uint32_t shardIndex, parsedURL&& url,
std::lock_guard<std::mutex> const&)
{
std::lock_guard lock(m_);
if (process_)
{
JLOG(j_.error()) <<
@@ -76,9 +230,12 @@ ShardArchiveHandler::add(std::uint32_t shardIndex, parsedURL&& url)
auto const it {archives_.find(shardIndex)};
if (it != archives_.end())
return url == it->second;
if (!app_.getShardStore()->prepareShard(shardIndex))
return false;
archives_.emplace(shardIndex, std::move(url));
return true;
}
@@ -107,16 +264,13 @@ ShardArchiveHandler::start()
try
{
// Remove if remnant from a crash
remove_all(downloadDir_);
// Create temp root download directory
create_directory(downloadDir_);
create_directories(downloadDir_);
if (!downloader_)
{
// will throw if can't initialize ssl context
downloader_ = std::make_shared<SSLHTTPDownloader>(
downloader_ = std::make_shared<DatabaseDownloader>(
app_.getIOService(), j_, app_.config());
}
}
@@ -130,12 +284,19 @@ ShardArchiveHandler::start()
return next(lock);
}
void
ShardArchiveHandler::release()
{
std::lock_guard<std::mutex> lock(m_);
doRelease(lock);
}
bool
ShardArchiveHandler::next(std::lock_guard<std::mutex>& l)
{
if (archives_.empty())
{
process_ = false;
doRelease(l);
return false;
}
@@ -154,8 +315,14 @@ ShardArchiveHandler::next(std::lock_guard<std::mutex>& l)
return next(l);
}
// Download the archive
// Download the archive. Process in another thread
// to prevent holding up the lock if the downloader
// sleeps.
auto const& url {archives_.begin()->second};
app_.getJobQueue().addJob(
jtCLIENT, "ShardArchiveHandler",
[this, ptr = shared_from_this(), url, dstDir](Job&)
{
if (!downloader_->download(
url.domain,
std::to_string(url.port.get_value_or(443)),
@@ -163,11 +330,13 @@ ShardArchiveHandler::next(std::lock_guard<std::mutex>& l)
11,
dstDir / "archive.tar.lz4",
std::bind(&ShardArchiveHandler::complete,
shared_from_this(), std::placeholders::_1)))
ptr, std::placeholders::_1)))
{
std::lock_guard<std::mutex> l(m_);
remove(l);
return next(l);
next(l);
}
});
process_ = true;
return true;
@@ -206,7 +375,7 @@ ShardArchiveHandler::complete(path dstPath)
jtCLIENT, "ShardArchiveHandler",
[=, dstPath = std::move(dstPath), ptr = shared_from_this()](Job&)
{
// If validating and not synced then defer and retry
// If not synced then defer and retry
auto const mode {ptr->app_.getOPs().getOperatingMode()};
if (mode != OperatingMode::FULL)
{
@@ -282,6 +451,11 @@ ShardArchiveHandler::remove(std::lock_guard<std::mutex>&)
app_.getShardStore()->removePreShard(shardIndex);
archives_.erase(shardIndex);
auto& session{sqliteDB_->getSession()};
session << "DELETE FROM State WHERE ShardIndex = :index;",
soci::use(shardIndex);
auto const dstDir {downloadDir_ / std::to_string(shardIndex)};
try
{
@@ -294,5 +468,37 @@ ShardArchiveHandler::remove(std::lock_guard<std::mutex>&)
}
}
void
ShardArchiveHandler::doRelease(std::lock_guard<std::mutex> const&)
{
process_ = false;
timer_.cancel();
for (auto const& ar : archives_)
app_.getShardStore()->removePreShard(ar.first);
archives_.clear();
{
auto& session{sqliteDB_->getSession()};
session << "DROP TABLE State;";
}
sqliteDB_.reset();
// Remove temp root download directory
try
{
remove_all(downloadDir_);
}
catch (std::exception const& e)
{
JLOG(j_.error()) << "exception: " << e.what()
<< " in function: " << __func__;
}
downloader_.reset();
}
} // RPC
} // ripple

View File

@@ -17,11 +17,10 @@
*/
//==============================================================================
#include <ripple/net/SSLHTTPDownloader.h>
#include <ripple/net/DatabaseDownloader.h>
#include <test/jtx.h>
#include <test/jtx/TrustedPublisherServer.h>
#include <test/unit_test/FileDirGuard.h>
#include <boost/filesystem.hpp>
#include <boost/filesystem/operations.hpp>
#include <boost/predef.h>
#include <mutex>
@@ -82,15 +81,15 @@ class SSLHTTPDownloader_test : public beast::unit_test::suite
beast::Journal journal_;
// The SSLHTTPDownloader must be created as shared_ptr
// because it uses shared_from_this
std::shared_ptr<SSLHTTPDownloader> ptr_;
std::shared_ptr<DatabaseDownloader> ptr_;
Downloader(jtx::Env& env)
: journal_ {sink_}
, ptr_ {std::make_shared<SSLHTTPDownloader>(
, ptr_ {std::make_shared<DatabaseDownloader>(
env.app().getIOService(), journal_, env.app().config())}
{}
SSLHTTPDownloader* operator->()
DatabaseDownloader* operator->()
{
return ptr_.get();
}
@@ -104,6 +103,7 @@ class SSLHTTPDownloader_test : public beast::unit_test::suite
(verify ? "Verify" : "No Verify");
using namespace jtx;
ripple::test::detail::FileDirGuard cert {
*this, "_cert", "ca.pem", TrustedPublisherServer::ca_cert()};
@@ -152,22 +152,11 @@ class SSLHTTPDownloader_test : public beast::unit_test::suite
testFailures()
{
testcase("Error conditions");
using namespace jtx;
Env env {*this};
{
// file exists
Downloader dl {env};
ripple::test::detail::FileDirGuard const datafile {
*this, "downloads", "data", "file contents"};
BEAST_EXPECT(!dl->download(
"localhost",
"443",
"",
11,
datafile.file(),
std::function<void(boost::filesystem::path)> {std::ref(cb)}));
}
{
// bad hostname
boost::system::error_code ec;

View File

@@ -0,0 +1,364 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/beast/utility/temp_dir.h>
#include <ripple/core/ConfigSections.h>
#include <ripple/nodestore/DummyScheduler.h>
#include <ripple/nodestore/Manager.h>
#include <ripple/nodestore/impl/DecodedBlob.h>
#include <ripple/protocol/jss.h>
#include <ripple/rpc/ShardArchiveHandler.h>
#include <test/jtx/Env.h>
#include <test/jtx/TrustedPublisherServer.h>
#include <test/jtx/envconfig.h>
#include <test/nodestore/TestBase.h>
namespace ripple {
namespace test {
class ShardArchiveHandler_test : public beast::unit_test::suite
{
using Downloads = std::vector<std::pair<uint32_t, std::string>>;
TrustedPublisherServer
createServer(jtx::Env& env, bool ssl = true)
{
std::vector<TrustedPublisherServer::Validator> list;
list.push_back(TrustedPublisherServer::randomValidator());
return TrustedPublisherServer{
env.app().getIOService(),
list,
env.timeKeeper().now() + std::chrono::seconds{3600},
ssl};
}
public:
void
testStateDatabase1()
{
testcase("testStateDatabase1");
{
beast::temp_dir tempDir;
auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path());
section.set("max_size_gb", "100");
c->setupControl(true, true, true);
jtx::Env env(*this, std::move(c));
auto handler = RPC::ShardArchiveHandler::getInstance(
env.app(), env.app().getJobQueue());
BEAST_EXPECT(handler);
BEAST_EXPECT(handler->init());
std::string const rawUrl = "https://foo:443/1.tar.lz4";
parsedURL url;
parseUrl(url, rawUrl);
handler->add(1, {url, rawUrl});
{
std::lock_guard<std::mutex> lock(handler->m_);
auto& session{handler->sqliteDB_->getSession()};
soci::rowset<soci::row> rs =
(session.prepare << "SELECT * FROM State;");
uint64_t rowCount = 0;
for (auto it = rs.begin(); it != rs.end(); ++it, ++rowCount)
{
BEAST_EXPECT(it->get<int>(0) == 1);
BEAST_EXPECT(it->get<std::string>(1) == rawUrl);
}
BEAST_EXPECT(rowCount == 1);
}
handler->release();
}
// Destroy the singleton so we start fresh in
// the next testcase.
RPC::ShardArchiveHandler::instance_.reset();
}
void
testStateDatabase2()
{
testcase("testStateDatabase2");
{
beast::temp_dir tempDir;
auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path());
section.set("max_size_gb", "100");
c->setupControl(true, true, true);
jtx::Env env(*this, std::move(c));
auto handler = RPC::ShardArchiveHandler::getInstance(
env.app(), env.app().getJobQueue());
BEAST_EXPECT(handler);
BEAST_EXPECT(handler->init());
Downloads const dl = {{1, "https://foo:443/1.tar.lz4"},
{2, "https://foo:443/2.tar.lz4"},
{3, "https://foo:443/3.tar.lz4"}};
for (auto const& entry : dl)
{
parsedURL url;
parseUrl(url, entry.second);
handler->add(entry.first, {url, entry.second});
}
{
std::lock_guard<std::mutex> lock(handler->m_);
auto& session{handler->sqliteDB_->getSession()};
soci::rowset<soci::row> rs =
(session.prepare << "SELECT * FROM State;");
uint64_t pos = 0;
for (auto it = rs.begin(); it != rs.end(); ++it, ++pos)
{
BEAST_EXPECT(it->get<int>(0) == dl[pos].first);
BEAST_EXPECT(it->get<std::string>(1) == dl[pos].second);
}
BEAST_EXPECT(pos == dl.size());
}
handler->release();
}
// Destroy the singleton so we start fresh in
// the next testcase.
RPC::ShardArchiveHandler::instance_.reset();
}
void
testStateDatabase3()
{
testcase("testStateDatabase3");
{
beast::temp_dir tempDir;
auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path());
section.set("max_size_gb", "100");
c->setupControl(true, true, true);
jtx::Env env(*this, std::move(c));
auto handler = RPC::ShardArchiveHandler::getInstance(
env.app(), env.app().getJobQueue());
BEAST_EXPECT(handler);
BEAST_EXPECT(handler->init());
auto server = createServer(env);
auto host = server.local_endpoint().address().to_string();
auto port = std::to_string(server.local_endpoint().port());
server.stop();
Downloads const dl = [&host, &port] {
Downloads ret;
for (int i = 1; i <= 10; ++i)
{
ret.push_back({i,
(boost::format("https://%s:%d/%d.tar.lz4") %
host % port % i)
.str()});
}
return ret;
}();
for (auto const& entry : dl)
{
parsedURL url;
parseUrl(url, entry.second);
handler->add(entry.first, {url, entry.second});
}
BEAST_EXPECT(handler->start());
auto stateDir = RPC::ShardArchiveHandler::getDownloadDirectory(
env.app().config());
std::unique_lock<std::mutex> lock(handler->m_);
BEAST_EXPECT(
boost::filesystem::exists(stateDir) ||
handler->archives_.empty());
while (!handler->archives_.empty())
{
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
lock.lock();
}
BEAST_EXPECT(!boost::filesystem::exists(stateDir));
}
// Destroy the singleton so we start fresh in
// the next testcase.
RPC::ShardArchiveHandler::instance_.reset();
}
void
testStateDatabase4()
{
testcase("testStateDatabase4");
beast::temp_dir tempDir;
{
auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path());
section.set("max_size_gb", "100");
c->setupControl(true, true, true);
jtx::Env env(*this, std::move(c));
auto handler = RPC::ShardArchiveHandler::getInstance(
env.app(), env.app().getJobQueue());
BEAST_EXPECT(handler);
BEAST_EXPECT(handler->init());
auto server = createServer(env);
auto host = server.local_endpoint().address().to_string();
auto port = std::to_string(server.local_endpoint().port());
server.stop();
Downloads const dl = [&host, &port] {
Downloads ret;
for (int i = 1; i <= 10; ++i)
{
ret.push_back({i,
(boost::format("https://%s:%d/%d.tar.lz4") %
host % port % i)
.str()});
}
return ret;
}();
for (auto const& entry : dl)
{
parsedURL url;
parseUrl(url, entry.second);
handler->add(entry.first, {url, entry.second});
}
auto stateDir = RPC::ShardArchiveHandler::getDownloadDirectory(
env.app().config());
boost::filesystem::copy_file(
stateDir / stateDBName,
boost::filesystem::path(tempDir.path()) / stateDBName);
BEAST_EXPECT(handler->start());
std::unique_lock<std::mutex> lock(handler->m_);
BEAST_EXPECT(
boost::filesystem::exists(stateDir) ||
handler->archives_.empty());
while (!handler->archives_.empty())
{
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
lock.lock();
}
BEAST_EXPECT(!boost::filesystem::exists(stateDir));
boost::filesystem::create_directory(stateDir);
boost::filesystem::copy_file(
boost::filesystem::path(tempDir.path()) / stateDBName,
stateDir / stateDBName);
}
// Destroy the singleton so we start fresh in
// the new scope.
RPC::ShardArchiveHandler::instance_.reset();
auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path());
section.set("max_size_gb", "100");
c->setupControl(true, true, true);
jtx::Env env(*this, std::move(c));
while (!RPC::ShardArchiveHandler::hasInstance())
std::this_thread::sleep_for(std::chrono::milliseconds(100));
BEAST_EXPECT(RPC::ShardArchiveHandler::hasInstance());
auto handler = RPC::ShardArchiveHandler::getInstance();
auto stateDir =
RPC::ShardArchiveHandler::getDownloadDirectory(env.app().config());
std::unique_lock<std::mutex> lock(handler->m_);
BEAST_EXPECT(
boost::filesystem::exists(stateDir) || handler->archives_.empty());
while (!handler->archives_.empty())
{
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
lock.lock();
}
BEAST_EXPECT(!boost::filesystem::exists(stateDir));
}
void
run() override
{
testStateDatabase1();
testStateDatabase2();
testStateDatabase3();
testStateDatabase4();
}
};
BEAST_DEFINE_TESTSUITE(ShardArchiveHandler, app, ripple);
} // namespace test
} // namespace ripple