Support shard downloading via HTTP or HTTPS

This commit is contained in:
Devon White
2020-08-11 12:10:10 -04:00
committed by Nik Bougalis
parent a8d481c2a5
commit 2bba79138f
10 changed files with 491 additions and 101 deletions

View File

@@ -497,12 +497,13 @@ target_sources (rippled PRIVATE
#]===============================]
src/ripple/net/impl/DatabaseDownloader.cpp
src/ripple/net/impl/HTTPClient.cpp
src/ripple/net/impl/HTTPDownloader.cpp
src/ripple/net/impl/HTTPStream.cpp
src/ripple/net/impl/InfoSub.cpp
src/ripple/net/impl/RPCCall.cpp
src/ripple/net/impl/RPCErr.cpp
src/ripple/net/impl/RPCSub.cpp
src/ripple/net/impl/RegisterSSLCerts.cpp
src/ripple/net/impl/SSLHTTPDownloader.cpp
#[===============================[
main sources:
subdir: nodestore

View File

@@ -21,11 +21,11 @@
#define RIPPLE_NET_DATABASEDOWNLOADER_H
#include <ripple/net/DatabaseBody.h>
#include <ripple/net/SSLHTTPDownloader.h>
#include <ripple/net/HTTPDownloader.h>
namespace ripple {
class DatabaseDownloader : public SSLHTTPDownloader
class DatabaseDownloader : public HTTPDownloader
{
public:
DatabaseDownloader(

View File

@@ -17,12 +17,12 @@
*/
//==============================================================================
#ifndef RIPPLE_NET_SSLHTTPDOWNLOADER_H_INCLUDED
#define RIPPLE_NET_SSLHTTPDOWNLOADER_H_INCLUDED
#ifndef RIPPLE_NET_HTTPDOWNLOADER_H_INCLUDED
#define RIPPLE_NET_HTTPDOWNLOADER_H_INCLUDED
#include <ripple/basics/Log.h>
#include <ripple/core/Config.h>
#include <ripple/net/HTTPClientSSLContext.h>
#include <ripple/net/HTTPStream.h>
#include <boost/asio/connect.hpp>
#include <boost/asio/io_service.hpp>
@@ -39,14 +39,14 @@
namespace ripple {
/** Provides an asynchronous HTTPS file downloader
/** Provides an asynchronous HTTP[S] file downloader
*/
class SSLHTTPDownloader
class HTTPDownloader
{
public:
using error_code = boost::system::error_code;
SSLHTTPDownloader(
HTTPDownloader(
boost::asio::io_service& io_service,
beast::Journal j,
Config const& config);
@@ -58,12 +58,13 @@ public:
std::string const& target,
int version,
boost::filesystem::path const& dstPath,
std::function<void(boost::filesystem::path)> complete);
std::function<void(boost::filesystem::path)> complete,
bool ssl = true);
void
onStop();
virtual ~SSLHTTPDownloader() = default;
virtual ~HTTPDownloader() = default;
protected:
using parser = boost::beast::http::basic_parser<false>;
@@ -79,10 +80,9 @@ protected:
std::shared_ptr<parser> parser);
private:
HTTPClientSSLContext ssl_ctx_;
Config const& config_;
boost::asio::io_service::strand strand_;
boost::optional<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>
stream_;
std::unique_ptr<HTTPStream> stream_;
boost::beast::flat_buffer read_buf_;
std::atomic<bool> cancelDownloads_;
@@ -99,6 +99,7 @@ private:
int version,
boost::filesystem::path dstPath,
std::function<void(boost::filesystem::path)> complete,
bool ssl,
boost::asio::yield_context yield);
virtual std::shared_ptr<parser>

158
src/ripple/net/HTTPStream.h Normal file
View File

@@ -0,0 +1,158 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_NET_HTTPSTREAM_H_INCLUDED
#define RIPPLE_NET_HTTPSTREAM_H_INCLUDED
#include <ripple/core/Config.h>
#include <ripple/net/HTTPClientSSLContext.h>
#include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
namespace ripple {
class HTTPStream
{
public:
using request = boost::beast::http::request<boost::beast::http::empty_body>;
using parser = boost::beast::http::basic_parser<false>;
virtual ~HTTPStream() = default;
template <class T>
static std::unique_ptr<HTTPStream>
makeUnique(
Config const& config,
boost::asio::io_service::strand& strand,
beast::Journal j)
{
return std::make_unique<T>(config, strand, j);
}
[[nodiscard]] virtual boost::asio::ip::tcp::socket&
getStream() = 0;
[[nodiscard]] virtual bool
connect(
std::string& errorOut,
std::string const host,
std::string const port,
boost::asio::yield_context& yield) = 0;
virtual void
asyncWrite(
request& req,
boost::asio::yield_context& yield,
boost::system::error_code& ec) = 0;
virtual void
asyncRead(
boost::beast::flat_buffer& buf,
parser& p,
bool readSome,
boost::asio::yield_context& yield,
boost::system::error_code& ec) = 0;
};
class SSLStream : public HTTPStream
{
public:
SSLStream(
Config const& config,
boost::asio::io_service::strand& strand,
beast::Journal j);
virtual ~SSLStream() = default;
boost::asio::ip::tcp::socket&
getStream() override;
bool
connect(
std::string& errorOut,
std::string const host,
std::string const port,
boost::asio::yield_context& yield) override;
void
asyncWrite(
request& req,
boost::asio::yield_context& yield,
boost::system::error_code& ec) override;
void
asyncRead(
boost::beast::flat_buffer& buf,
parser& p,
bool readSome,
boost::asio::yield_context& yield,
boost::system::error_code& ec) override;
private:
HTTPClientSSLContext ssl_ctx_;
boost::optional<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>
stream_;
boost::asio::io_service::strand& strand_;
};
class RawStream : public HTTPStream
{
public:
RawStream(
Config const& config,
boost::asio::io_service::strand& strand,
beast::Journal j);
virtual ~RawStream() = default;
boost::asio::ip::tcp::socket&
getStream() override;
bool
connect(
std::string& errorOut,
std::string const host,
std::string const port,
boost::asio::yield_context& yield) override;
void
asyncWrite(
request& req,
boost::asio::yield_context& yield,
boost::system::error_code& ec) override;
void
asyncRead(
boost::beast::flat_buffer& buf,
parser& p,
bool readSome,
boost::asio::yield_context& yield,
boost::system::error_code& ec) override;
private:
boost::optional<boost::asio::ip::tcp::socket> stream_;
boost::asio::io_service::strand& strand_;
};
} // namespace ripple
#endif // RIPPLE_NET_HTTPSTREAM_H

View File

@@ -2,8 +2,12 @@
## Overview
This document describes mechanics of the `SSLHTTPDownloader`, a class that performs the task of downloading shards from remote web servers via
SSL HTTP. The downloader utilizes a strand (`boost::asio::io_service::strand`) to ensure that downloads are never executed concurrently. Hence, if a download is in progress when another download is initiated, the second download will be queued and invoked only when the first download is completed.
This document describes mechanics of the `HTTPDownloader`, a class that performs
the task of downloading shards from remote web servers via HTTP. The downloader
utilizes a strand (`boost::asio::io_service::strand`) to ensure that downloads
are never executed concurrently. Hence, if a download is in progress when
another download is initiated, the second download will be queued and invoked
only when the first download is completed.
## Motivation
@@ -18,13 +22,15 @@ This document was created to document the changes introduced by this change.
Much of the shard downloading process concerns the following classes:
- `SSLHTTPDownloader`
- `HTTPDownloader`
This is a generic class designed for serially executing downloads via HTTP SSL.
This is a generic class designed for serially executing downloads via HTTP.
- `ShardArchiveHandler`
This class uses the `SSLHTTPDownloader` to fetch shards from remote web servers. Additionally, the archive handler performs sanity checks on the downloaded files and imports the validated files into the local shard store.
This class uses the `HTTPDownloader` to fetch shards from remote web servers.
Additionally, the archive handler performs validity checks on the downloaded
files and imports the validated files into the local shard store.
The `ShardArchiveHandler` exposes a simple public interface:
@@ -43,35 +49,45 @@ Much of the shard downloading process concerns the following classes:
start();
```
When a client submits a `download_shard` command via the RPC interface, each of the requested files is registered with the handler via the `add` method. After all the files have been registered, the handler's `start` method is invoked, which in turn creates an instance of the `SSLHTTPDownloader` and begins the first download. When the download is completed, the downloader invokes the handler's `complete` method, which will initiate the download of the next file, or simply return if there are no more downloads to process. When `complete` is invoked with no remaining files to be downloaded, the handler and downloader are not destroyed automatically, but persist for the duration of the application to assist with graceful shutdowns by `Stoppable`.
When a client submits a `download_shard` command via the RPC interface, each
of the requested files is registered with the handler via the `add` method.
After all the files have been registered, the handler's `start` method is
invoked, which in turn creates an instance of the `HTTPDownloader` and begins
the first download. When the download is completed, the downloader invokes
the handler's `complete` method, which will initiate the download of the next
file, or simply return if there are no more downloads to process. When
`complete` is invoked with no remaining files to be downloaded, the handler
and downloader are not destroyed automatically, but persist for the duration
of the application to assist with graceful shutdowns by `Stoppable`.
- `DatabaseBody`
This class defines a custom message body type, allowing an `http::response_parser` to write to an SQLite database rather than to a flat file. This class is discussed in further detail in the Recovery section.
This class defines a custom message body type, allowing an
`http::response_parser` to write to an SQLite database rather than to a flat
file. This class is discussed in further detail in the Recovery section.
## Graceful Shutdowns & Recovery
This section describes in greater detail how the shutdown and recovery features of the downloader are implemented in C++ using the `boost::asio` framework.
This section describes in greater detail how the shutdown and recovery features
of the downloader are implemented in C++ using the `boost::asio` framework.
##### Member Variables:
The variables shown here are members of the `SSLHTTPDownloader` class and
The variables shown here are members of the `HTTPDownloader` class and
will be used in the following code examples.
```c++
using boost::asio::ssl::stream;
using boost::asio::ip::tcp::socket;
stream<socket> stream_;
std::condition_variable c_;
std::atomic<bool> cancelDownloads_;
std::unique_ptr<HTTPStream> stream_;
std::condition_variable c_;
std::atomic<bool> cancelDownloads_;
```
### Graceful Shutdowns
##### Thread 1:
A graceful shutdown begins when the `onStop()` method of the `ShardArchiveHandler` is invoked:
A graceful shutdown begins when the `onStop()` method of the
`ShardArchiveHandler` is invoked:
```c++
void
@@ -89,11 +105,13 @@ ShardArchiveHandler::onStop()
}
```
Inside of `SSLHTTPDownloader::onStop()`, if a download is currently in progress, the `cancelDownloads_` member variable is set and the thread waits for the download to stop:
Inside of `HTTPDownloader::onStop()`, if a download is currently in progress,
the `cancelDownloads_` member variable is set and the thread waits for the
download to stop:
```c++
void
SSLHTTPDownloader::onStop()
HTTPDownloader::onStop()
{
std::unique_lock lock(m_);
@@ -113,10 +131,14 @@ SSLHTTPDownloader::onStop()
##### Thread 2:
The graceful shutdown is realized when the thread executing the download polls `cancelDownloads_` after this variable has been set to `true`. Polling occurs while the file is being downloaded, in between calls to `async_read_some()`. The stop takes effect when the socket is closed and the handler function ( `do_session()` ) is exited.
The graceful shutdown is realized when the thread executing the download polls
`cancelDownloads_` after this variable has been set to `true`. Polling occurs
while the file is being downloaded, in between calls to `async_read_some()`. The
stop takes effect when the socket is closed and the handler function (
`do_session()` ) is exited.
```c++
void SSLHTTPDownloader::do_session()
void HTTPDownloader::do_session()
{
// (Connection initialization logic) . . .
@@ -137,11 +159,19 @@ void SSLHTTPDownloader::do_session()
### Recovery
Persisting the current state of both the archive handler and the downloader is achieved by leveraging an SQLite database rather than flat files, as the database protects against data corruption that could result from a system crash.
Persisting the current state of both the archive handler and the downloader is
achieved by leveraging an SQLite database rather than flat files, as the
database protects against data corruption that could result from a system crash.
##### ShardArchiveHandler
Although `SSLHTTPDownloader` is a generic class that could be used to download a variety of file types, currently it is used exclusively by the `ShardArchiveHandler` to download shards. In order to provide resilience, the `ShardArchiveHandler` will use an SQLite database to preserve its current state whenever there are active, paused, or queued downloads. The `shard_db` section in the configuration file allows users to specify the location of the database to use for this purpose.
Although `HTTPDownloader` is a generic class that could be used to download a
variety of file types, currently it is used exclusively by the
`ShardArchiveHandler` to download shards. In order to provide resilience, the
`ShardArchiveHandler` will use an SQLite database to preserve its current state
whenever there are active, paused, or queued downloads. The `shard_db` section
in the configuration file allows users to specify the location of the database
to use for this purpose.
###### SQLite Table Format
@@ -151,13 +181,19 @@ Although `SSLHTTPDownloader` is a generic class that could be used to download a
| 2 | ht<span />tps://example.com/2.tar.lz4 |
| 5 | ht<span />tps://example.com/5.tar.lz4 |
##### SSLHTTPDownloader
##### HTTPDownloader
While the archive handler maintains a list of all partial and queued downloads, the `SSLHTTPDownloader` stores the raw bytes of the file currently being downloaded. The partially downloaded file will be represented as one or more `BLOB` entries in an SQLite database. As the maximum size of a `BLOB` entry is currently limited to roughly 2.1 GB, a 5 GB shard file for instance will occupy three database entries upon completion.
While the archive handler maintains a list of all partial and queued downloads,
the `HTTPDownloader` stores the raw bytes of the file currently being
downloaded. The partially downloaded file will be represented as one or more
`BLOB` entries in an SQLite database. As the maximum size of a `BLOB` entry is
currently limited to roughly 2.1 GB, a 5 GB shard file for instance will occupy
three database entries upon completion.
###### SQLite Table Format
Since downloads execute serially by design, the entries in this table always correspond to the contents of a single file.
Since downloads execute serially by design, the entries in this table always
correspond to the contents of a single file.
| Bytes | Size | Part |
|:------:|:----------:|:----:|
@@ -166,7 +202,9 @@ Since downloads execute serially by design, the entries in this table always cor
| 0x... | 705032706 | 2 |
##### Config File Entry
The `download_path` field of the `shard_db` entry is used to determine where to store the recovery database. If this field is omitted, the `path` field will be used instead.
The `download_path` field of the `shard_db` entry is used to determine where to
store the recovery database. If this field is omitted, the `path` field will be
used instead.
```dosini
# This is the persistent datastore for shards. It is important for the health
@@ -181,7 +219,9 @@ max_historical_shards=50
```
##### Resuming Partial Downloads
When resuming downloads after a shutdown, crash, or other interruption, the `SSLHTTPDownloader` will utilize the `range` field of the HTTP header to download only the remainder of the partially downloaded file.
When resuming downloads after a shutdown, crash, or other interruption, the
`HTTPDownloader` will utilize the `range` field of the HTTP header to download
only the remainder of the partially downloaded file.
```C++
auto downloaded = getPartialFileSize();
@@ -214,7 +254,15 @@ else
##### DatabaseBody
Previously, the `SSLHTTPDownloader` leveraged an `http::response_parser` instantiated with an `http::file_body`. The `file_body` class declares a nested type, `reader`, which does the task of writing HTTP message payloads (constituting a requested file) to the filesystem. In order for the `http::response_parser` to interface with the database, we implement a custom body type that declares a nested `reader` type which has been outfitted to persist octects received from the remote host to a local SQLite database. The code snippet below illustrates the customization points available to user-defined body types:
Previously, the `HTTPDownloader` leveraged an `http::response_parser`
instantiated with an `http::file_body`. The `file_body` class declares a nested
type, `reader`, which does the task of writing HTTP message payloads
(constituting a requested file) to the filesystem. In order for the
`http::response_parser` to interface with the database, we implement a custom
body type that declares a nested `reader` type which has been outfitted to
persist octects received from the remote host to a local SQLite database. The
code snippet below illustrates the customization points available to
user-defined body types:
```C++
/// Defines a Body type
@@ -223,7 +271,8 @@ struct body
/// This determines the return type of the `message::body` member function
using value_type = ...;
/// An optional function, returns the body's payload size (which may be zero)
/// An optional function, returns the body's payload size (which may be
/// zero)
static
std::uint64_t
size(value_type const& v);
@@ -236,9 +285,11 @@ struct body
}
```
Note that the `DatabaseBody` class is specifically designed to work with `asio` and follows `asio` conventions.
Note that the `DatabaseBody` class is specifically designed to work with `asio`
and follows `asio` conventions.
The method invoked to write data to the filesystem (or SQLite database in our case) has the following signature:
The method invoked to write data to the filesystem (or SQLite database in our
case) has the following signature:
```C++
std::size_t
@@ -247,7 +298,9 @@ body::reader::put(ConstBufferSequence const& buffers, error_code& ec);
## Sequence Diagram
This sequence diagram demonstrates a scenario wherein the `ShardArchiveHandler` leverages the state persisted in the database to recover from a crash and resume the requested downloads.
This sequence diagram demonstrates a scenario wherein the `ShardArchiveHandler`
leverages the state persisted in the database to recover from a crash and resume
the requested downloads.
![alt_text](./images/interrupt_sequence.png "Resuming downloads post abort")

View File

@@ -25,7 +25,7 @@ DatabaseDownloader::DatabaseDownloader(
boost::asio::io_service& io_service,
beast::Journal j,
Config const& config)
: SSLHTTPDownloader(io_service, j, config)
: HTTPDownloader(io_service, j, config)
, config_(config)
, io_service_(io_service)
{

View File

@@ -17,17 +17,17 @@
*/
//==============================================================================
#include <ripple/net/SSLHTTPDownloader.h>
#include <ripple/net/HTTPDownloader.h>
#include <boost/asio/ssl.hpp>
namespace ripple {
SSLHTTPDownloader::SSLHTTPDownloader(
HTTPDownloader::HTTPDownloader(
boost::asio::io_service& io_service,
beast::Journal j,
Config const& config)
: j_(j)
, ssl_ctx_(config, j, boost::asio::ssl::context::tlsv12_client)
, config_(config)
, strand_(io_service)
, cancelDownloads_(false)
, sessionActive_(false)
@@ -35,13 +35,14 @@ SSLHTTPDownloader::SSLHTTPDownloader(
}
bool
SSLHTTPDownloader::download(
HTTPDownloader::download(
std::string const& host,
std::string const& port,
std::string const& target,
int version,
boost::filesystem::path const& dstPath,
std::function<void(boost::filesystem::path)> complete)
std::function<void(boost::filesystem::path)> complete,
bool ssl)
{
if (!checkPath(dstPath))
return false;
@@ -57,19 +58,20 @@ SSLHTTPDownloader::download(
if (!strand_.running_in_this_thread())
strand_.post(std::bind(
&SSLHTTPDownloader::download,
&HTTPDownloader::download,
this,
host,
port,
target,
version,
dstPath,
complete));
complete,
ssl));
else
boost::asio::spawn(
strand_,
std::bind(
&SSLHTTPDownloader::do_session,
&HTTPDownloader::do_session,
this,
host,
port,
@@ -77,12 +79,13 @@ SSLHTTPDownloader::download(
version,
dstPath,
complete,
ssl,
std::placeholders::_1));
return true;
}
void
SSLHTTPDownloader::onStop()
HTTPDownloader::onStop()
{
std::unique_lock lock(m_);
@@ -96,13 +99,14 @@ SSLHTTPDownloader::onStop()
}
void
SSLHTTPDownloader::do_session(
HTTPDownloader::do_session(
std::string const host,
std::string const port,
std::string const target,
int version,
boost::filesystem::path dstPath,
std::function<void(boost::filesystem::path)> complete,
bool ssl,
boost::asio::yield_context yield)
{
using namespace boost::asio;
@@ -118,17 +122,18 @@ SSLHTTPDownloader::do_session(
closeBody(p);
// Gracefully close the stream
stream_->async_shutdown(yield[ec]);
stream_->getStream().shutdown(socket_base::shutdown_both, ec);
if (ec == boost::asio::error::eof)
ec.assign(0, ec.category());
if (ec)
{
// Most web servers don't bother with performing
// the SSL shutdown handshake, for speed.
JLOG(j_.trace()) << "async_shutdown: " << ec.message();
JLOG(j_.trace()) << "shutdown: " << ec.message();
}
// The socket cannot be reused
stream_ = boost::none;
// The stream cannot be reused
stream_.reset();
};
// When the downloader is being stopped
@@ -161,36 +166,12 @@ SSLHTTPDownloader::do_session(
// connection:
std::uint64_t const rangeStart = size(p);
ip::tcp::resolver resolver{strand_.context()};
auto const results = resolver.async_resolve(host, port, yield[ec]);
if (ec)
return failAndExit("async_resolve", p);
stream_ = ssl ? HTTPStream::makeUnique<SSLStream>(config_, strand_, j_)
: HTTPStream::makeUnique<RawStream>(config_, strand_, j_);
try
{
stream_.emplace(strand_.context(), ssl_ctx_.context());
}
catch (std::exception const& e)
{
return failAndExit(std::string("exception: ") + e.what(), p);
}
ec = ssl_ctx_.preConnectVerify(*stream_, host);
if (ec)
return failAndExit("preConnectVerify", p);
boost::asio::async_connect(
stream_->next_layer(), results.begin(), results.end(), yield[ec]);
if (ec)
return failAndExit("async_connect", p);
ec = ssl_ctx_.postConnectVerify(*stream_, host);
if (ec)
return failAndExit("postConnectVerify", p);
stream_->async_handshake(ssl::stream_base::client, yield[ec]);
if (ec)
return failAndExit("async_handshake", p);
std::string error;
if (!stream_->connect(error, host, port, yield))
return failAndExit(error, p);
// Set up an HTTP HEAD request message to find the file size
http::request<http::empty_body> req{http::verb::head, target, version};
@@ -205,7 +186,7 @@ SSLHTTPDownloader::do_session(
(boost::format("bytes=%llu-") % rangeStart).str());
}
http::async_write(*stream_, req, yield[ec]);
stream_->asyncWrite(req, yield, ec);
if (ec)
return failAndExit("async_write", p);
@@ -213,7 +194,7 @@ SSLHTTPDownloader::do_session(
// Read the response
http::response_parser<http::empty_body> connectParser;
connectParser.skip(true);
http::async_read(*stream_, read_buf_, connectParser, yield[ec]);
stream_->asyncRead(read_buf_, connectParser, false, yield, ec);
if (ec)
return failAndExit("async_read", p);
@@ -222,14 +203,14 @@ SSLHTTPDownloader::do_session(
{
req.erase(http::field::range);
http::async_write(*stream_, req, yield[ec]);
stream_->asyncWrite(req, yield, ec);
if (ec)
return failAndExit("async_write_range_verify", p);
http::response_parser<http::empty_body> rangeParser;
rangeParser.skip(true);
http::async_read(*stream_, read_buf_, rangeParser, yield[ec]);
stream_->asyncRead(read_buf_, rangeParser, false, yield, ec);
if (ec)
return failAndExit("async_read_range_verify", p);
@@ -280,7 +261,7 @@ SSLHTTPDownloader::do_session(
}
}
http::async_write(*stream_, req, yield[ec]);
stream_->asyncWrite(req, yield, ec);
if (ec)
return failAndExit("async_write", p);
@@ -299,7 +280,7 @@ SSLHTTPDownloader::do_session(
return exit();
}
http::async_read_some(*stream_, read_buf_, *p, yield[ec]);
stream_->asyncRead(read_buf_, *p, true, yield, ec);
}
JLOG(j_.trace()) << "download completed: " << dstPath.string();
@@ -312,7 +293,7 @@ SSLHTTPDownloader::do_session(
}
void
SSLHTTPDownloader::fail(
HTTPDownloader::fail(
boost::filesystem::path dstPath,
std::function<void(boost::filesystem::path)> const& complete,
boost::system::error_code const& ec,

View File

@@ -0,0 +1,191 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/net/HTTPStream.h>
namespace ripple {
SSLStream::SSLStream(
Config const& config,
boost::asio::io_service::strand& strand,
beast::Journal j)
: ssl_ctx_(config, j, boost::asio::ssl::context::tlsv12_client)
, strand_(strand)
{
}
boost::asio::ip::tcp::socket&
SSLStream::getStream()
{
assert(stream_);
return stream_->next_layer();
}
bool
SSLStream::connect(
std::string& errorOut,
std::string const host,
std::string const port,
boost::asio::yield_context& yield)
{
using namespace boost::asio;
using namespace boost::beast;
boost::system::error_code ec;
auto fail = [&errorOut](std::string const& errorIn) {
errorOut = errorIn;
return false;
};
ip::tcp::resolver resolver{strand_.context()};
auto const endpoints = resolver.async_resolve(host, port, yield[ec]);
if (ec)
return fail("async_resolve");
try
{
stream_.emplace(strand_.context(), ssl_ctx_.context());
}
catch (std::exception const& e)
{
return fail(std::string("exception: ") + e.what());
}
ec = ssl_ctx_.preConnectVerify(*stream_, host);
if (ec)
return fail("preConnectVerify");
boost::asio::async_connect(
stream_->next_layer(), endpoints.begin(), endpoints.end(), yield[ec]);
if (ec)
return fail("async_connect");
ec = ssl_ctx_.postConnectVerify(*stream_, host);
if (ec)
return fail("postConnectVerify");
stream_->async_handshake(ssl::stream_base::client, yield[ec]);
if (ec)
return fail("async_handshake");
return true;
}
void
SSLStream::asyncWrite(
request& req,
boost::asio::yield_context& yield,
boost::system::error_code& ec)
{
boost::beast::http::async_write(*stream_, req, yield[ec]);
}
void
SSLStream::asyncRead(
boost::beast::flat_buffer& buf,
parser& p,
bool readSome,
boost::asio::yield_context& yield,
boost::system::error_code& ec)
{
if (readSome)
boost::beast::http::async_read_some(*stream_, buf, p, yield[ec]);
else
boost::beast::http::async_read(*stream_, buf, p, yield[ec]);
}
RawStream::RawStream(
Config const& config,
boost::asio::io_service::strand& strand,
beast::Journal j)
: strand_(strand)
{
}
boost::asio::ip::tcp::socket&
RawStream::getStream()
{
assert(stream_);
return *stream_;
}
bool
RawStream::connect(
std::string& errorOut,
std::string const host,
std::string const port,
boost::asio::yield_context& yield)
{
using namespace boost::asio;
using namespace boost::beast;
boost::system::error_code ec;
auto fail = [&errorOut](std::string const& errorIn) {
errorOut = errorIn;
return false;
};
ip::tcp::resolver resolver{strand_.context()};
auto const endpoints = resolver.async_resolve(host, port, yield[ec]);
if (ec)
return fail("async_resolve");
try
{
stream_.emplace(strand_.context());
}
catch (std::exception const& e)
{
return fail(std::string("exception: ") + e.what());
}
boost::asio::async_connect(
*stream_, endpoints.begin(), endpoints.end(), yield[ec]);
if (ec)
return fail("async_connect");
return true;
}
void
RawStream::asyncWrite(
request& req,
boost::asio::yield_context& yield,
boost::system::error_code& ec)
{
boost::beast::http::async_write(*stream_, req, yield[ec]);
}
void
RawStream::asyncRead(
boost::beast::flat_buffer& buf,
parser& p,
bool readSome,
boost::asio::yield_context& yield,
boost::system::error_code& ec)
{
if (readSome)
boost::beast::http::async_read_some(*stream_, buf, p, yield[ec]);
else
boost::beast::http::async_read(*stream_, buf, p, yield[ec]);
}
} // namespace ripple

View File

@@ -99,8 +99,9 @@ doDownloadShard(RPC::JsonContext& context)
{
return RPC::invalid_field_error(jss::url);
}
if (url.scheme != "https")
return RPC::expected_field_error(std::string(jss::url), "HTTPS");
if (url.scheme != "https" && url.scheme != "http")
return RPC::expected_field_error(
std::string(jss::url), "HTTPS or HTTP");
// URL must point to an lz4 compressed tar archive '.tar.lz4'
auto archiveName{url.path.substr(url.path.find_last_of("/\\") + 1)};

View File

@@ -375,13 +375,17 @@ ShardArchiveHandler::next(std::lock_guard<std::mutex> const& l)
// sleeps.
auto const& url{archives_.begin()->second};
auto wrapper = jobCounter_.wrap([this, url, dstDir](Job&) {
auto const ssl = (url.scheme == "https");
auto const defaultPort = ssl ? 443 : 80;
if (!downloader_->download(
url.domain,
std::to_string(url.port.get_value_or(443)),
std::to_string(url.port.get_value_or(defaultPort)),
url.path,
11,
dstDir / "archive.tar.lz4",
[this](path dstPath) { complete(dstPath); }))
[this](path dstPath) { complete(dstPath); },
ssl))
{
std::lock_guard<std::mutex> l(m_);
removeAndProceed(l);