mirror of
https://github.com/Xahau/xahaud.git
synced 2025-11-20 10:35:50 +00:00
Download queued shards only if there is space for them all
This commit is contained in:
@@ -86,6 +86,19 @@ DatabaseCon::~DatabaseCon()
|
|||||||
if (checkpointer_)
|
if (checkpointer_)
|
||||||
{
|
{
|
||||||
checkpointers.erase(checkpointer_->id());
|
checkpointers.erase(checkpointer_->id());
|
||||||
|
|
||||||
|
std::weak_ptr<Checkpointer> wk(checkpointer_);
|
||||||
|
checkpointer_.reset();
|
||||||
|
|
||||||
|
// The references to our Checkpointer held by 'checkpointer_' and
|
||||||
|
// 'checkpointers' have been removed, so if the use count is nonzero, a
|
||||||
|
// checkpoint is currently in progress. Wait for it to end, otherwise
|
||||||
|
// creating a new DatabaseCon to the same database may fail due to the
|
||||||
|
// database being locked by our (now old) Checkpointer.
|
||||||
|
while (wk.use_count())
|
||||||
|
{
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -38,24 +38,14 @@ public:
|
|||||||
|
|
||||||
virtual ~HTTPStream() = default;
|
virtual ~HTTPStream() = default;
|
||||||
|
|
||||||
template <class T>
|
|
||||||
static std::unique_ptr<HTTPStream>
|
|
||||||
makeUnique(
|
|
||||||
Config const& config,
|
|
||||||
boost::asio::io_service::strand& strand,
|
|
||||||
beast::Journal j)
|
|
||||||
{
|
|
||||||
return std::make_unique<T>(config, strand, j);
|
|
||||||
}
|
|
||||||
|
|
||||||
[[nodiscard]] virtual boost::asio::ip::tcp::socket&
|
[[nodiscard]] virtual boost::asio::ip::tcp::socket&
|
||||||
getStream() = 0;
|
getStream() = 0;
|
||||||
|
|
||||||
[[nodiscard]] virtual bool
|
[[nodiscard]] virtual bool
|
||||||
connect(
|
connect(
|
||||||
std::string& errorOut,
|
std::string& errorOut,
|
||||||
std::string const host,
|
std::string const& host,
|
||||||
std::string const port,
|
std::string const& port,
|
||||||
boost::asio::yield_context& yield) = 0;
|
boost::asio::yield_context& yield) = 0;
|
||||||
|
|
||||||
virtual void
|
virtual void
|
||||||
@@ -68,7 +58,13 @@ public:
|
|||||||
asyncRead(
|
asyncRead(
|
||||||
boost::beast::flat_buffer& buf,
|
boost::beast::flat_buffer& buf,
|
||||||
parser& p,
|
parser& p,
|
||||||
bool readSome,
|
boost::asio::yield_context& yield,
|
||||||
|
boost::system::error_code& ec) = 0;
|
||||||
|
|
||||||
|
virtual void
|
||||||
|
asyncReadSome(
|
||||||
|
boost::beast::flat_buffer& buf,
|
||||||
|
parser& p,
|
||||||
boost::asio::yield_context& yield,
|
boost::asio::yield_context& yield,
|
||||||
boost::system::error_code& ec) = 0;
|
boost::system::error_code& ec) = 0;
|
||||||
};
|
};
|
||||||
@@ -89,8 +85,8 @@ public:
|
|||||||
bool
|
bool
|
||||||
connect(
|
connect(
|
||||||
std::string& errorOut,
|
std::string& errorOut,
|
||||||
std::string const host,
|
std::string const& host,
|
||||||
std::string const port,
|
std::string const& port,
|
||||||
boost::asio::yield_context& yield) override;
|
boost::asio::yield_context& yield) override;
|
||||||
|
|
||||||
void
|
void
|
||||||
@@ -103,7 +99,13 @@ public:
|
|||||||
asyncRead(
|
asyncRead(
|
||||||
boost::beast::flat_buffer& buf,
|
boost::beast::flat_buffer& buf,
|
||||||
parser& p,
|
parser& p,
|
||||||
bool readSome,
|
boost::asio::yield_context& yield,
|
||||||
|
boost::system::error_code& ec) override;
|
||||||
|
|
||||||
|
void
|
||||||
|
asyncReadSome(
|
||||||
|
boost::beast::flat_buffer& buf,
|
||||||
|
parser& p,
|
||||||
boost::asio::yield_context& yield,
|
boost::asio::yield_context& yield,
|
||||||
boost::system::error_code& ec) override;
|
boost::system::error_code& ec) override;
|
||||||
|
|
||||||
@@ -117,10 +119,7 @@ private:
|
|||||||
class RawStream : public HTTPStream
|
class RawStream : public HTTPStream
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
RawStream(
|
RawStream(boost::asio::io_service::strand& strand);
|
||||||
Config const& config,
|
|
||||||
boost::asio::io_service::strand& strand,
|
|
||||||
beast::Journal j);
|
|
||||||
|
|
||||||
virtual ~RawStream() = default;
|
virtual ~RawStream() = default;
|
||||||
|
|
||||||
@@ -130,8 +129,8 @@ public:
|
|||||||
bool
|
bool
|
||||||
connect(
|
connect(
|
||||||
std::string& errorOut,
|
std::string& errorOut,
|
||||||
std::string const host,
|
std::string const& host,
|
||||||
std::string const port,
|
std::string const& port,
|
||||||
boost::asio::yield_context& yield) override;
|
boost::asio::yield_context& yield) override;
|
||||||
|
|
||||||
void
|
void
|
||||||
@@ -144,7 +143,13 @@ public:
|
|||||||
asyncRead(
|
asyncRead(
|
||||||
boost::beast::flat_buffer& buf,
|
boost::beast::flat_buffer& buf,
|
||||||
parser& p,
|
parser& p,
|
||||||
bool readSome,
|
boost::asio::yield_context& yield,
|
||||||
|
boost::system::error_code& ec) override;
|
||||||
|
|
||||||
|
void
|
||||||
|
asyncReadSome(
|
||||||
|
boost::beast::flat_buffer& buf,
|
||||||
|
parser& p,
|
||||||
boost::asio::yield_context& yield,
|
boost::asio::yield_context& yield,
|
||||||
boost::system::error_code& ec) override;
|
boost::system::error_code& ec) override;
|
||||||
|
|
||||||
|
|||||||
@@ -152,10 +152,10 @@ HTTPDownloader::do_session(
|
|||||||
//////////////////////////////////////////////
|
//////////////////////////////////////////////
|
||||||
// Prepare for download and establish the
|
// Prepare for download and establish the
|
||||||
// connection:
|
// connection:
|
||||||
std::uint64_t const rangeStart = size(p);
|
if (ssl)
|
||||||
|
stream_ = std::make_unique<SSLStream>(config_, strand_, j_);
|
||||||
stream_ = ssl ? HTTPStream::makeUnique<SSLStream>(config_, strand_, j_)
|
else
|
||||||
: HTTPStream::makeUnique<RawStream>(config_, strand_, j_);
|
stream_ = std::make_unique<RawStream>(strand_);
|
||||||
|
|
||||||
std::string error;
|
std::string error;
|
||||||
if (!stream_->connect(error, host, port, yield))
|
if (!stream_->connect(error, host, port, yield))
|
||||||
@@ -166,6 +166,8 @@ HTTPDownloader::do_session(
|
|||||||
req.set(http::field::host, host);
|
req.set(http::field::host, host);
|
||||||
req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
|
req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
|
||||||
|
|
||||||
|
std::uint64_t const rangeStart = size(p);
|
||||||
|
|
||||||
// Requesting a portion of the file
|
// Requesting a portion of the file
|
||||||
if (rangeStart)
|
if (rangeStart)
|
||||||
{
|
{
|
||||||
@@ -182,7 +184,7 @@ HTTPDownloader::do_session(
|
|||||||
// Read the response
|
// Read the response
|
||||||
http::response_parser<http::empty_body> connectParser;
|
http::response_parser<http::empty_body> connectParser;
|
||||||
connectParser.skip(true);
|
connectParser.skip(true);
|
||||||
stream_->asyncRead(read_buf_, connectParser, false, yield, ec);
|
stream_->asyncRead(read_buf_, connectParser, yield, ec);
|
||||||
if (ec)
|
if (ec)
|
||||||
return failAndExit("async_read", p);
|
return failAndExit("async_read", p);
|
||||||
|
|
||||||
@@ -198,7 +200,7 @@ HTTPDownloader::do_session(
|
|||||||
http::response_parser<http::empty_body> rangeParser;
|
http::response_parser<http::empty_body> rangeParser;
|
||||||
rangeParser.skip(true);
|
rangeParser.skip(true);
|
||||||
|
|
||||||
stream_->asyncRead(read_buf_, rangeParser, false, yield, ec);
|
stream_->asyncRead(read_buf_, rangeParser, yield, ec);
|
||||||
if (ec)
|
if (ec)
|
||||||
return failAndExit("async_read_range_verify", p);
|
return failAndExit("async_read_range_verify", p);
|
||||||
|
|
||||||
@@ -268,7 +270,7 @@ HTTPDownloader::do_session(
|
|||||||
return exit();
|
return exit();
|
||||||
}
|
}
|
||||||
|
|
||||||
stream_->asyncRead(read_buf_, *p, true, yield, ec);
|
stream_->asyncReadSome(read_buf_, *p, yield, ec);
|
||||||
}
|
}
|
||||||
|
|
||||||
JLOG(j_.trace()) << "download completed: " << dstPath.string();
|
JLOG(j_.trace()) << "download completed: " << dstPath.string();
|
||||||
|
|||||||
@@ -40,8 +40,8 @@ SSLStream::getStream()
|
|||||||
bool
|
bool
|
||||||
SSLStream::connect(
|
SSLStream::connect(
|
||||||
std::string& errorOut,
|
std::string& errorOut,
|
||||||
std::string const host,
|
std::string const& host,
|
||||||
std::string const port,
|
std::string const& port,
|
||||||
boost::asio::yield_context& yield)
|
boost::asio::yield_context& yield)
|
||||||
{
|
{
|
||||||
using namespace boost::asio;
|
using namespace boost::asio;
|
||||||
@@ -49,8 +49,10 @@ SSLStream::connect(
|
|||||||
|
|
||||||
boost::system::error_code ec;
|
boost::system::error_code ec;
|
||||||
|
|
||||||
auto fail = [&errorOut](std::string const& errorIn) {
|
auto fail = [&errorOut, &ec](
|
||||||
errorOut = errorIn;
|
std::string const& errorIn,
|
||||||
|
std::string const& message = "") {
|
||||||
|
errorOut = errorIn + ": " + (message.empty() ? ec.message() : message);
|
||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -65,7 +67,7 @@ SSLStream::connect(
|
|||||||
}
|
}
|
||||||
catch (std::exception const& e)
|
catch (std::exception const& e)
|
||||||
{
|
{
|
||||||
return fail(std::string("exception: ") + e.what());
|
return fail("exception", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
ec = ssl_ctx_.preConnectVerify(*stream_, host);
|
ec = ssl_ctx_.preConnectVerify(*stream_, host);
|
||||||
@@ -101,21 +103,23 @@ void
|
|||||||
SSLStream::asyncRead(
|
SSLStream::asyncRead(
|
||||||
boost::beast::flat_buffer& buf,
|
boost::beast::flat_buffer& buf,
|
||||||
parser& p,
|
parser& p,
|
||||||
bool readSome,
|
|
||||||
boost::asio::yield_context& yield,
|
boost::asio::yield_context& yield,
|
||||||
boost::system::error_code& ec)
|
boost::system::error_code& ec)
|
||||||
{
|
{
|
||||||
if (readSome)
|
boost::beast::http::async_read(*stream_, buf, p, yield[ec]);
|
||||||
boost::beast::http::async_read_some(*stream_, buf, p, yield[ec]);
|
|
||||||
else
|
|
||||||
boost::beast::http::async_read(*stream_, buf, p, yield[ec]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
RawStream::RawStream(
|
void
|
||||||
Config const& config,
|
SSLStream::asyncReadSome(
|
||||||
boost::asio::io_service::strand& strand,
|
boost::beast::flat_buffer& buf,
|
||||||
beast::Journal j)
|
parser& p,
|
||||||
: strand_(strand)
|
boost::asio::yield_context& yield,
|
||||||
|
boost::system::error_code& ec)
|
||||||
|
{
|
||||||
|
boost::beast::http::async_read_some(*stream_, buf, p, yield[ec]);
|
||||||
|
}
|
||||||
|
|
||||||
|
RawStream::RawStream(boost::asio::io_service::strand& strand) : strand_(strand)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -129,8 +133,8 @@ RawStream::getStream()
|
|||||||
bool
|
bool
|
||||||
RawStream::connect(
|
RawStream::connect(
|
||||||
std::string& errorOut,
|
std::string& errorOut,
|
||||||
std::string const host,
|
std::string const& host,
|
||||||
std::string const port,
|
std::string const& port,
|
||||||
boost::asio::yield_context& yield)
|
boost::asio::yield_context& yield)
|
||||||
{
|
{
|
||||||
using namespace boost::asio;
|
using namespace boost::asio;
|
||||||
@@ -138,8 +142,10 @@ RawStream::connect(
|
|||||||
|
|
||||||
boost::system::error_code ec;
|
boost::system::error_code ec;
|
||||||
|
|
||||||
auto fail = [&errorOut](std::string const& errorIn) {
|
auto fail = [&errorOut, &ec](
|
||||||
errorOut = errorIn;
|
std::string const& errorIn,
|
||||||
|
std::string const& message = "") {
|
||||||
|
errorOut = errorIn + ": " + (message.empty() ? ec.message() : message);
|
||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -154,7 +160,7 @@ RawStream::connect(
|
|||||||
}
|
}
|
||||||
catch (std::exception const& e)
|
catch (std::exception const& e)
|
||||||
{
|
{
|
||||||
return fail(std::string("exception: ") + e.what());
|
return fail("exception", e.what());
|
||||||
}
|
}
|
||||||
|
|
||||||
boost::asio::async_connect(
|
boost::asio::async_connect(
|
||||||
@@ -178,14 +184,20 @@ void
|
|||||||
RawStream::asyncRead(
|
RawStream::asyncRead(
|
||||||
boost::beast::flat_buffer& buf,
|
boost::beast::flat_buffer& buf,
|
||||||
parser& p,
|
parser& p,
|
||||||
bool readSome,
|
|
||||||
boost::asio::yield_context& yield,
|
boost::asio::yield_context& yield,
|
||||||
boost::system::error_code& ec)
|
boost::system::error_code& ec)
|
||||||
{
|
{
|
||||||
if (readSome)
|
boost::beast::http::async_read(*stream_, buf, p, yield[ec]);
|
||||||
boost::beast::http::async_read_some(*stream_, buf, p, yield[ec]);
|
}
|
||||||
else
|
|
||||||
boost::beast::http::async_read(*stream_, buf, p, yield[ec]);
|
void
|
||||||
|
RawStream::asyncReadSome(
|
||||||
|
boost::beast::flat_buffer& buf,
|
||||||
|
parser& p,
|
||||||
|
boost::asio::yield_context& yield,
|
||||||
|
boost::system::error_code& ec)
|
||||||
|
{
|
||||||
|
boost::beast::http::async_read_some(*stream_, buf, p, yield[ec]);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace ripple
|
} // namespace ripple
|
||||||
|
|||||||
@@ -78,13 +78,13 @@ public:
|
|||||||
virtual boost::optional<std::uint32_t>
|
virtual boost::optional<std::uint32_t>
|
||||||
prepareLedger(std::uint32_t validLedgerSeq) = 0;
|
prepareLedger(std::uint32_t validLedgerSeq) = 0;
|
||||||
|
|
||||||
/** Prepare a shard index to be imported into the database
|
/** Prepare one or more shard indexes to be imported into the database
|
||||||
|
|
||||||
@param shardIndex Shard index to be prepared for import
|
@param shardIndexes Shard indexes to be prepared for import
|
||||||
@return true if shard index successfully prepared for import
|
@return true if all shard indexes successfully prepared for import
|
||||||
*/
|
*/
|
||||||
virtual bool
|
virtual bool
|
||||||
prepareShard(std::uint32_t shardIndex) = 0;
|
prepareShards(std::vector<std::uint32_t> const& shardIndexes) = 0;
|
||||||
|
|
||||||
/** Remove a previously prepared shard index for import
|
/** Remove a previously prepared shard index for import
|
||||||
|
|
||||||
|
|||||||
@@ -293,62 +293,110 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq)
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
DatabaseShardImp::prepareShard(std::uint32_t shardIndex)
|
DatabaseShardImp::prepareShards(std::vector<std::uint32_t> const& shardIndexes)
|
||||||
{
|
{
|
||||||
auto fail = [j = j_, shardIndex](std::string const& msg) {
|
auto fail = [j = j_, &shardIndexes](
|
||||||
JLOG(j.error()) << "shard " << shardIndex << " " << msg;
|
std::string const& msg,
|
||||||
|
boost::optional<std::uint32_t> shardIndex = boost::none) {
|
||||||
|
auto multipleIndexPrequel = [&shardIndexes] {
|
||||||
|
std::vector<std::string> indexesAsString(shardIndexes.size());
|
||||||
|
std::transform(
|
||||||
|
shardIndexes.begin(),
|
||||||
|
shardIndexes.end(),
|
||||||
|
indexesAsString.begin(),
|
||||||
|
[](uint32_t const index) { return std::to_string(index); });
|
||||||
|
|
||||||
|
return std::string("shard") +
|
||||||
|
(shardIndexes.size() > 1 ? "s " : " ") +
|
||||||
|
boost::algorithm::join(indexesAsString, ", ");
|
||||||
|
};
|
||||||
|
|
||||||
|
std::string const prequel = shardIndex
|
||||||
|
? "shard " + std::to_string(*shardIndex)
|
||||||
|
: multipleIndexPrequel();
|
||||||
|
|
||||||
|
JLOG(j.error()) << prequel << " " << msg;
|
||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
|
|
||||||
if (shardIndex < earliestShardIndex())
|
|
||||||
{
|
|
||||||
return fail(
|
|
||||||
"comes before earliest shard index " +
|
|
||||||
std::to_string(earliestShardIndex()));
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we are synced to the network, check if the shard index is
|
|
||||||
// greater or equal to the current or validated shard index.
|
|
||||||
auto seqCheck = [&](std::uint32_t ledgerSeq) {
|
|
||||||
if (ledgerSeq >= earliestLedgerSeq() &&
|
|
||||||
shardIndex >= seqToShardIndex(ledgerSeq))
|
|
||||||
{
|
|
||||||
return fail("invalid index");
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
};
|
|
||||||
if (!seqCheck(app_.getLedgerMaster().getValidLedgerIndex() + 1) ||
|
|
||||||
!seqCheck(app_.getLedgerMaster().getCurrentLedgerIndex()))
|
|
||||||
{
|
|
||||||
return fail("invalid index");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Any shard earlier than the two most recent shards is a historical shard
|
|
||||||
auto const isHistoricalShard{shardIndex < shardBoundaryIndex()};
|
|
||||||
|
|
||||||
std::lock_guard lock(mutex_);
|
std::lock_guard lock(mutex_);
|
||||||
assert(init_);
|
assert(init_);
|
||||||
|
|
||||||
if (!canAdd_)
|
if (!canAdd_)
|
||||||
return fail("cannot be stored at this time");
|
return fail("cannot be stored at this time");
|
||||||
|
|
||||||
// Check shard count and available storage space
|
auto historicalShardsToPrepare = 0;
|
||||||
if (isHistoricalShard && numHistoricalShards(lock) >= maxHistoricalShards_)
|
|
||||||
return fail("maximum number of historical shards reached");
|
|
||||||
|
|
||||||
if (!sufficientStorage(
|
for (auto const shardIndex : shardIndexes)
|
||||||
1,
|
|
||||||
isHistoricalShard ? PathDesignation::historical
|
|
||||||
: PathDesignation::none,
|
|
||||||
lock))
|
|
||||||
{
|
{
|
||||||
return fail("insufficient storage space available");
|
if (shardIndex < earliestShardIndex())
|
||||||
|
{
|
||||||
|
return fail(
|
||||||
|
"comes before earliest shard index " +
|
||||||
|
std::to_string(earliestShardIndex()),
|
||||||
|
shardIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we are synced to the network, check if the shard index is
|
||||||
|
// greater or equal to the current or validated shard index.
|
||||||
|
auto seqCheck = [&](std::uint32_t ledgerSeq) {
|
||||||
|
if (ledgerSeq >= earliestLedgerSeq() &&
|
||||||
|
shardIndex >= seqToShardIndex(ledgerSeq))
|
||||||
|
{
|
||||||
|
return fail("invalid index", shardIndex);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
if (!seqCheck(app_.getLedgerMaster().getValidLedgerIndex() + 1) ||
|
||||||
|
!seqCheck(app_.getLedgerMaster().getCurrentLedgerIndex()))
|
||||||
|
{
|
||||||
|
return fail("invalid index", shardIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (shards_.find(shardIndex) != shards_.end())
|
||||||
|
return fail("is already stored", shardIndex);
|
||||||
|
|
||||||
|
if (preparedIndexes_.find(shardIndex) != preparedIndexes_.end())
|
||||||
|
return fail("is already queued for import", shardIndex);
|
||||||
|
|
||||||
|
// Any shard earlier than the two most recent shards
|
||||||
|
// is a historical shard
|
||||||
|
if (shardIndex < shardBoundaryIndex())
|
||||||
|
++historicalShardsToPrepare;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (shards_.find(shardIndex) != shards_.end())
|
auto const numHistShards = numHistoricalShards(lock);
|
||||||
return fail("already stored");
|
|
||||||
if (!preparedIndexes_.emplace(shardIndex).second)
|
// Check shard count and available storage space
|
||||||
return fail("already queued for import");
|
if (numHistShards + historicalShardsToPrepare > maxHistoricalShards_)
|
||||||
|
return fail("maximum number of historical shards reached");
|
||||||
|
|
||||||
|
if (historicalShardsToPrepare)
|
||||||
|
{
|
||||||
|
// Check available storage space for historical shards
|
||||||
|
if (!sufficientStorage(
|
||||||
|
historicalShardsToPrepare, PathDesignation::historical, lock))
|
||||||
|
return fail("insufficient storage space available");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (auto const recentShardsToPrepare =
|
||||||
|
shardIndexes.size() - historicalShardsToPrepare;
|
||||||
|
recentShardsToPrepare)
|
||||||
|
{
|
||||||
|
// Check available storage space for recent shards
|
||||||
|
if (!sufficientStorage(
|
||||||
|
recentShardsToPrepare, PathDesignation::none, lock))
|
||||||
|
return fail("insufficient storage space available");
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto const shardIndex : shardIndexes)
|
||||||
|
{
|
||||||
|
auto const prepareSuccessful =
|
||||||
|
preparedIndexes_.emplace(shardIndex).second;
|
||||||
|
|
||||||
|
(void)prepareSuccessful;
|
||||||
|
assert(prepareSuccessful);
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ public:
|
|||||||
prepareLedger(std::uint32_t validLedgerSeq) override;
|
prepareLedger(std::uint32_t validLedgerSeq) override;
|
||||||
|
|
||||||
bool
|
bool
|
||||||
prepareShard(std::uint32_t shardIndex) override;
|
prepareShards(std::vector<std::uint32_t> const& shardIndexes) override;
|
||||||
|
|
||||||
void
|
void
|
||||||
removePreShard(std::uint32_t shardIndex) override;
|
removePreShard(std::uint32_t shardIndex) override;
|
||||||
|
|||||||
@@ -247,9 +247,6 @@ ShardArchiveHandler::add(
|
|||||||
if (it != archives_.end())
|
if (it != archives_.end())
|
||||||
return url == it->second;
|
return url == it->second;
|
||||||
|
|
||||||
if (!app_.getShardStore()->prepareShard(shardIndex))
|
|
||||||
return false;
|
|
||||||
|
|
||||||
archives_.emplace(shardIndex, std::move(url));
|
archives_.emplace(shardIndex, std::move(url));
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
@@ -275,6 +272,16 @@ ShardArchiveHandler::start()
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::vector<std::uint32_t> shardIndexes(archives_.size());
|
||||||
|
std::transform(
|
||||||
|
archives_.begin(),
|
||||||
|
archives_.end(),
|
||||||
|
shardIndexes.begin(),
|
||||||
|
[](auto const& entry) { return entry.first; });
|
||||||
|
|
||||||
|
if (!app_.getShardStore()->prepareShards(shardIndexes))
|
||||||
|
return false;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// Create temp root download directory
|
// Create temp root download directory
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ namespace test {
|
|||||||
*/
|
*/
|
||||||
class CaptureLogs : public Logs
|
class CaptureLogs : public Logs
|
||||||
{
|
{
|
||||||
|
std::mutex strmMutex_;
|
||||||
std::stringstream strm_;
|
std::stringstream strm_;
|
||||||
std::string* pResult_;
|
std::string* pResult_;
|
||||||
|
|
||||||
@@ -38,13 +39,17 @@ class CaptureLogs : public Logs
|
|||||||
*/
|
*/
|
||||||
class CaptureSink : public beast::Journal::Sink
|
class CaptureSink : public beast::Journal::Sink
|
||||||
{
|
{
|
||||||
|
std::mutex& strmMutex_;
|
||||||
std::stringstream& strm_;
|
std::stringstream& strm_;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
CaptureSink(
|
CaptureSink(
|
||||||
beast::severities::Severity threshold,
|
beast::severities::Severity threshold,
|
||||||
|
std::mutex& mutex,
|
||||||
std::stringstream& strm)
|
std::stringstream& strm)
|
||||||
: beast::Journal::Sink(threshold, false), strm_(strm)
|
: beast::Journal::Sink(threshold, false)
|
||||||
|
, strmMutex_(mutex)
|
||||||
|
, strm_(strm)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -52,6 +57,7 @@ class CaptureLogs : public Logs
|
|||||||
write(beast::severities::Severity level, std::string const& text)
|
write(beast::severities::Severity level, std::string const& text)
|
||||||
override
|
override
|
||||||
{
|
{
|
||||||
|
std::lock_guard lock(strmMutex_);
|
||||||
strm_ << text;
|
strm_ << text;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -72,7 +78,7 @@ public:
|
|||||||
std::string const& partition,
|
std::string const& partition,
|
||||||
beast::severities::Severity threshold) override
|
beast::severities::Severity threshold) override
|
||||||
{
|
{
|
||||||
return std::make_unique<CaptureSink>(threshold, strm_);
|
return std::make_unique<CaptureSink>(threshold, strmMutex_, strm_);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -647,9 +647,9 @@ class DatabaseShard_test : public TestBase
|
|||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
testPrepareShard(std::uint64_t const seedValue)
|
testPrepareShards(std::uint64_t const seedValue)
|
||||||
{
|
{
|
||||||
testcase("Prepare shard");
|
testcase("Prepare shards");
|
||||||
|
|
||||||
using namespace test::jtx;
|
using namespace test::jtx;
|
||||||
|
|
||||||
@@ -675,7 +675,7 @@ class DatabaseShard_test : public TestBase
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
db->prepareShard(n);
|
db->prepareShards({n});
|
||||||
bitMask |= 1ll << n;
|
bitMask |= 1ll << n;
|
||||||
}
|
}
|
||||||
BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask));
|
BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask));
|
||||||
@@ -683,11 +683,11 @@ class DatabaseShard_test : public TestBase
|
|||||||
|
|
||||||
// test illegal cases
|
// test illegal cases
|
||||||
// adding shards with too large number
|
// adding shards with too large number
|
||||||
db->prepareShard(0);
|
db->prepareShards({0});
|
||||||
BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask));
|
BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask));
|
||||||
db->prepareShard(nTestShards + 1);
|
db->prepareShards({nTestShards + 1});
|
||||||
BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask));
|
BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask));
|
||||||
db->prepareShard(nTestShards + 2);
|
db->prepareShards({nTestShards + 2});
|
||||||
BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask));
|
BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask));
|
||||||
|
|
||||||
// create shards which are not prepared for import
|
// create shards which are not prepared for import
|
||||||
@@ -753,7 +753,7 @@ class DatabaseShard_test : public TestBase
|
|||||||
if (!BEAST_EXPECT(data.makeLedgers(env)))
|
if (!BEAST_EXPECT(data.makeLedgers(env)))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
db->prepareShard(1);
|
db->prepareShards({1});
|
||||||
BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(2));
|
BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(2));
|
||||||
|
|
||||||
using namespace boost::filesystem;
|
using namespace boost::filesystem;
|
||||||
@@ -1338,7 +1338,7 @@ public:
|
|||||||
testCreateShard(seedValue);
|
testCreateShard(seedValue);
|
||||||
testReopenDatabase(seedValue + 10);
|
testReopenDatabase(seedValue + 10);
|
||||||
testGetCompleteShards(seedValue + 20);
|
testGetCompleteShards(seedValue + 20);
|
||||||
testPrepareShard(seedValue + 30);
|
testPrepareShards(seedValue + 30);
|
||||||
testImportShard(seedValue + 40);
|
testImportShard(seedValue + 40);
|
||||||
testCorruptedDatabase(seedValue + 50);
|
testCorruptedDatabase(seedValue + 50);
|
||||||
testIllegalFinalKey(seedValue + 60);
|
testIllegalFinalKey(seedValue + 60);
|
||||||
|
|||||||
@@ -24,6 +24,7 @@
|
|||||||
#include <ripple/nodestore/impl/DecodedBlob.h>
|
#include <ripple/nodestore/impl/DecodedBlob.h>
|
||||||
#include <ripple/protocol/jss.h>
|
#include <ripple/protocol/jss.h>
|
||||||
#include <ripple/rpc/ShardArchiveHandler.h>
|
#include <ripple/rpc/ShardArchiveHandler.h>
|
||||||
|
#include <test/jtx/CaptureLogs.h>
|
||||||
#include <test/jtx/Env.h>
|
#include <test/jtx/Env.h>
|
||||||
#include <test/jtx/TrustedPublisherServer.h>
|
#include <test/jtx/TrustedPublisherServer.h>
|
||||||
#include <test/jtx/envconfig.h>
|
#include <test/jtx/envconfig.h>
|
||||||
@@ -49,9 +50,9 @@ class ShardArchiveHandler_test : public beast::unit_test::suite
|
|||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
// Test the shard downloading module by initiating
|
// Test the shard downloading module by queueing
|
||||||
// and completing a download and verifying the
|
// a download and verifying the contents of the
|
||||||
// contents of the state database.
|
// state database.
|
||||||
void
|
void
|
||||||
testSingleDownloadAndStateDB()
|
testSingleDownloadAndStateDB()
|
||||||
{
|
{
|
||||||
@@ -98,9 +99,9 @@ public:
|
|||||||
handler->release();
|
handler->release();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test the shard downloading module by initiating
|
// Test the shard downloading module by queueing
|
||||||
// and completing three downloads and verifying
|
// three downloads and verifying the contents of
|
||||||
// the contents of the state database.
|
// the state database.
|
||||||
void
|
void
|
||||||
testDownloadsAndStateDB()
|
testDownloadsAndStateDB()
|
||||||
{
|
{
|
||||||
@@ -414,6 +415,251 @@ public:
|
|||||||
BEAST_EXPECT(!boost::filesystem::exists(stateDir));
|
BEAST_EXPECT(!boost::filesystem::exists(stateDir));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure that downloads fail when the shard
|
||||||
|
// database cannot store any more shards
|
||||||
|
void
|
||||||
|
testShardCountFailure()
|
||||||
|
{
|
||||||
|
testcase("testShardCountFailure");
|
||||||
|
std::string capturedLogs;
|
||||||
|
|
||||||
|
{
|
||||||
|
beast::temp_dir tempDir;
|
||||||
|
|
||||||
|
auto c = jtx::envconfig();
|
||||||
|
auto& section = c->section(ConfigSection::shardDatabase());
|
||||||
|
section.set("path", tempDir.path());
|
||||||
|
section.set("max_historical_shards", "1");
|
||||||
|
section.set("ledgers_per_shard", "256");
|
||||||
|
section.set("earliest_seq", "257");
|
||||||
|
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
|
||||||
|
sectionNode.set("earliest_seq", "257");
|
||||||
|
c->setupControl(true, true, true);
|
||||||
|
|
||||||
|
std::unique_ptr<Logs> logs(new CaptureLogs(&capturedLogs));
|
||||||
|
jtx::Env env(*this, std::move(c), std::move(logs));
|
||||||
|
|
||||||
|
std::uint8_t const numberOfDownloads = 10;
|
||||||
|
|
||||||
|
// Create some ledgers so that the ShardArchiveHandler
|
||||||
|
// can verify the last ledger hash for the shard
|
||||||
|
// downloads.
|
||||||
|
for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() *
|
||||||
|
(numberOfDownloads + 1);
|
||||||
|
++i)
|
||||||
|
{
|
||||||
|
env.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
auto handler = env.app().getShardArchiveHandler();
|
||||||
|
BEAST_EXPECT(handler);
|
||||||
|
BEAST_EXPECT(
|
||||||
|
dynamic_cast<RPC::RecoveryHandler*>(handler) == nullptr);
|
||||||
|
|
||||||
|
auto server = createServer(env);
|
||||||
|
auto host = server->local_endpoint().address().to_string();
|
||||||
|
auto port = std::to_string(server->local_endpoint().port());
|
||||||
|
server->stop();
|
||||||
|
|
||||||
|
Downloads const dl = [count = numberOfDownloads, &host, &port] {
|
||||||
|
Downloads ret;
|
||||||
|
|
||||||
|
for (int i = 1; i <= count; ++i)
|
||||||
|
{
|
||||||
|
ret.push_back(
|
||||||
|
{i,
|
||||||
|
(boost::format("https://%s:%d/%d.tar.lz4") % host %
|
||||||
|
port % i)
|
||||||
|
.str()});
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}();
|
||||||
|
|
||||||
|
for (auto const& entry : dl)
|
||||||
|
{
|
||||||
|
parsedURL url;
|
||||||
|
parseUrl(url, entry.second);
|
||||||
|
handler->add(entry.first, {url, entry.second});
|
||||||
|
}
|
||||||
|
|
||||||
|
BEAST_EXPECT(!handler->start());
|
||||||
|
auto stateDir = RPC::ShardArchiveHandler::getDownloadDirectory(
|
||||||
|
env.app().config());
|
||||||
|
|
||||||
|
handler->release();
|
||||||
|
BEAST_EXPECT(!boost::filesystem::exists(stateDir));
|
||||||
|
}
|
||||||
|
|
||||||
|
auto const expectedErrorMessage =
|
||||||
|
"shards 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 maximum number of historical "
|
||||||
|
"shards reached";
|
||||||
|
BEAST_EXPECT(
|
||||||
|
capturedLogs.find(expectedErrorMessage) != std::string::npos);
|
||||||
|
|
||||||
|
{
|
||||||
|
beast::temp_dir tempDir;
|
||||||
|
|
||||||
|
auto c = jtx::envconfig();
|
||||||
|
auto& section = c->section(ConfigSection::shardDatabase());
|
||||||
|
section.set("path", tempDir.path());
|
||||||
|
section.set("max_historical_shards", "0");
|
||||||
|
section.set("ledgers_per_shard", "256");
|
||||||
|
section.set("earliest_seq", "257");
|
||||||
|
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
|
||||||
|
sectionNode.set("earliest_seq", "257");
|
||||||
|
c->setupControl(true, true, true);
|
||||||
|
|
||||||
|
std::unique_ptr<Logs> logs(new CaptureLogs(&capturedLogs));
|
||||||
|
jtx::Env env(*this, std::move(c), std::move(logs));
|
||||||
|
|
||||||
|
std::uint8_t const numberOfDownloads = 1;
|
||||||
|
|
||||||
|
// Create some ledgers so that the ShardArchiveHandler
|
||||||
|
// can verify the last ledger hash for the shard
|
||||||
|
// downloads.
|
||||||
|
for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() *
|
||||||
|
((numberOfDownloads * 3) + 1);
|
||||||
|
++i)
|
||||||
|
{
|
||||||
|
env.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
auto handler = env.app().getShardArchiveHandler();
|
||||||
|
BEAST_EXPECT(handler);
|
||||||
|
BEAST_EXPECT(
|
||||||
|
dynamic_cast<RPC::RecoveryHandler*>(handler) == nullptr);
|
||||||
|
|
||||||
|
auto server = createServer(env);
|
||||||
|
auto host = server->local_endpoint().address().to_string();
|
||||||
|
auto port = std::to_string(server->local_endpoint().port());
|
||||||
|
server->stop();
|
||||||
|
|
||||||
|
Downloads const dl = [count = numberOfDownloads, &host, &port] {
|
||||||
|
Downloads ret;
|
||||||
|
|
||||||
|
for (int i = 1; i <= count; ++i)
|
||||||
|
{
|
||||||
|
ret.push_back(
|
||||||
|
{i,
|
||||||
|
(boost::format("https://%s:%d/%d.tar.lz4") % host %
|
||||||
|
port % i)
|
||||||
|
.str()});
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}();
|
||||||
|
|
||||||
|
for (auto const& entry : dl)
|
||||||
|
{
|
||||||
|
parsedURL url;
|
||||||
|
parseUrl(url, entry.second);
|
||||||
|
handler->add(entry.first, {url, entry.second});
|
||||||
|
}
|
||||||
|
|
||||||
|
BEAST_EXPECT(!handler->start());
|
||||||
|
auto stateDir = RPC::ShardArchiveHandler::getDownloadDirectory(
|
||||||
|
env.app().config());
|
||||||
|
|
||||||
|
handler->release();
|
||||||
|
BEAST_EXPECT(!boost::filesystem::exists(stateDir));
|
||||||
|
}
|
||||||
|
|
||||||
|
auto const expectedErrorMessage2 =
|
||||||
|
"shard 1 maximum number of historical shards reached";
|
||||||
|
BEAST_EXPECT(
|
||||||
|
capturedLogs.find(expectedErrorMessage2) != std::string::npos);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure that downloads fail when the shard
|
||||||
|
// database has already stored one of the
|
||||||
|
// queued shards
|
||||||
|
void
|
||||||
|
testRedundantShardFailure()
|
||||||
|
{
|
||||||
|
testcase("testRedundantShardFailure");
|
||||||
|
std::string capturedLogs;
|
||||||
|
|
||||||
|
{
|
||||||
|
beast::temp_dir tempDir;
|
||||||
|
|
||||||
|
auto c = jtx::envconfig();
|
||||||
|
auto& section = c->section(ConfigSection::shardDatabase());
|
||||||
|
section.set("path", tempDir.path());
|
||||||
|
section.set("max_historical_shards", "1");
|
||||||
|
section.set("ledgers_per_shard", "256");
|
||||||
|
section.set("earliest_seq", "257");
|
||||||
|
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
|
||||||
|
sectionNode.set("earliest_seq", "257");
|
||||||
|
c->setupControl(true, true, true);
|
||||||
|
|
||||||
|
std::unique_ptr<Logs> logs(new CaptureLogs(&capturedLogs));
|
||||||
|
jtx::Env env(
|
||||||
|
*this,
|
||||||
|
std::move(c),
|
||||||
|
std::move(logs),
|
||||||
|
beast::severities::kDebug);
|
||||||
|
|
||||||
|
std::uint8_t const numberOfDownloads = 10;
|
||||||
|
|
||||||
|
// Create some ledgers so that the ShardArchiveHandler
|
||||||
|
// can verify the last ledger hash for the shard
|
||||||
|
// downloads.
|
||||||
|
for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() *
|
||||||
|
(numberOfDownloads + 1);
|
||||||
|
++i)
|
||||||
|
{
|
||||||
|
env.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
env.app().getShardStore()->prepareShards({1});
|
||||||
|
|
||||||
|
auto handler = env.app().getShardArchiveHandler();
|
||||||
|
BEAST_EXPECT(handler);
|
||||||
|
BEAST_EXPECT(
|
||||||
|
dynamic_cast<RPC::RecoveryHandler*>(handler) == nullptr);
|
||||||
|
|
||||||
|
auto server = createServer(env);
|
||||||
|
auto host = server->local_endpoint().address().to_string();
|
||||||
|
auto port = std::to_string(server->local_endpoint().port());
|
||||||
|
server->stop();
|
||||||
|
|
||||||
|
Downloads const dl = [count = numberOfDownloads, &host, &port] {
|
||||||
|
Downloads ret;
|
||||||
|
|
||||||
|
for (int i = 1; i <= count; ++i)
|
||||||
|
{
|
||||||
|
ret.push_back(
|
||||||
|
{i,
|
||||||
|
(boost::format("https://%s:%d/%d.tar.lz4") % host %
|
||||||
|
port % i)
|
||||||
|
.str()});
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}();
|
||||||
|
|
||||||
|
for (auto const& entry : dl)
|
||||||
|
{
|
||||||
|
parsedURL url;
|
||||||
|
parseUrl(url, entry.second);
|
||||||
|
handler->add(entry.first, {url, entry.second});
|
||||||
|
}
|
||||||
|
|
||||||
|
BEAST_EXPECT(!handler->start());
|
||||||
|
auto stateDir = RPC::ShardArchiveHandler::getDownloadDirectory(
|
||||||
|
env.app().config());
|
||||||
|
|
||||||
|
handler->release();
|
||||||
|
BEAST_EXPECT(!boost::filesystem::exists(stateDir));
|
||||||
|
}
|
||||||
|
|
||||||
|
auto const expectedErrorMessage =
|
||||||
|
"shard 1 is already queued for import";
|
||||||
|
BEAST_EXPECT(
|
||||||
|
capturedLogs.find(expectedErrorMessage) != std::string::npos);
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
run() override
|
run() override
|
||||||
{
|
{
|
||||||
@@ -421,6 +667,8 @@ public:
|
|||||||
testDownloadsAndStateDB();
|
testDownloadsAndStateDB();
|
||||||
testDownloadsAndFileSystem();
|
testDownloadsAndFileSystem();
|
||||||
testDownloadsAndRestart();
|
testDownloadsAndRestart();
|
||||||
|
testShardCountFailure();
|
||||||
|
testRedundantShardFailure();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user