diff --git a/src/ripple/core/impl/DatabaseCon.cpp b/src/ripple/core/impl/DatabaseCon.cpp index 064f44b50..b7f6306d4 100644 --- a/src/ripple/core/impl/DatabaseCon.cpp +++ b/src/ripple/core/impl/DatabaseCon.cpp @@ -86,6 +86,19 @@ DatabaseCon::~DatabaseCon() if (checkpointer_) { checkpointers.erase(checkpointer_->id()); + + std::weak_ptr wk(checkpointer_); + checkpointer_.reset(); + + // The references to our Checkpointer held by 'checkpointer_' and + // 'checkpointers' have been removed, so if the use count is nonzero, a + // checkpoint is currently in progress. Wait for it to end, otherwise + // creating a new DatabaseCon to the same database may fail due to the + // database being locked by our (now old) Checkpointer. + while (wk.use_count()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } } } diff --git a/src/ripple/net/HTTPStream.h b/src/ripple/net/HTTPStream.h index a0946a3c4..767d33948 100644 --- a/src/ripple/net/HTTPStream.h +++ b/src/ripple/net/HTTPStream.h @@ -38,24 +38,14 @@ public: virtual ~HTTPStream() = default; - template - static std::unique_ptr - makeUnique( - Config const& config, - boost::asio::io_service::strand& strand, - beast::Journal j) - { - return std::make_unique(config, strand, j); - } - [[nodiscard]] virtual boost::asio::ip::tcp::socket& getStream() = 0; [[nodiscard]] virtual bool connect( std::string& errorOut, - std::string const host, - std::string const port, + std::string const& host, + std::string const& port, boost::asio::yield_context& yield) = 0; virtual void @@ -68,7 +58,13 @@ public: asyncRead( boost::beast::flat_buffer& buf, parser& p, - bool readSome, + boost::asio::yield_context& yield, + boost::system::error_code& ec) = 0; + + virtual void + asyncReadSome( + boost::beast::flat_buffer& buf, + parser& p, boost::asio::yield_context& yield, boost::system::error_code& ec) = 0; }; @@ -89,8 +85,8 @@ public: bool connect( std::string& errorOut, - std::string const host, - std::string const port, + std::string const& host, + std::string const& port, boost::asio::yield_context& yield) override; void @@ -103,7 +99,13 @@ public: asyncRead( boost::beast::flat_buffer& buf, parser& p, - bool readSome, + boost::asio::yield_context& yield, + boost::system::error_code& ec) override; + + void + asyncReadSome( + boost::beast::flat_buffer& buf, + parser& p, boost::asio::yield_context& yield, boost::system::error_code& ec) override; @@ -117,10 +119,7 @@ private: class RawStream : public HTTPStream { public: - RawStream( - Config const& config, - boost::asio::io_service::strand& strand, - beast::Journal j); + RawStream(boost::asio::io_service::strand& strand); virtual ~RawStream() = default; @@ -130,8 +129,8 @@ public: bool connect( std::string& errorOut, - std::string const host, - std::string const port, + std::string const& host, + std::string const& port, boost::asio::yield_context& yield) override; void @@ -144,7 +143,13 @@ public: asyncRead( boost::beast::flat_buffer& buf, parser& p, - bool readSome, + boost::asio::yield_context& yield, + boost::system::error_code& ec) override; + + void + asyncReadSome( + boost::beast::flat_buffer& buf, + parser& p, boost::asio::yield_context& yield, boost::system::error_code& ec) override; diff --git a/src/ripple/net/impl/HTTPDownloader.cpp b/src/ripple/net/impl/HTTPDownloader.cpp index f0d554684..3e3b511a7 100644 --- a/src/ripple/net/impl/HTTPDownloader.cpp +++ b/src/ripple/net/impl/HTTPDownloader.cpp @@ -152,10 +152,10 @@ HTTPDownloader::do_session( ////////////////////////////////////////////// // Prepare for download and establish the // connection: - std::uint64_t const rangeStart = size(p); - - stream_ = ssl ? HTTPStream::makeUnique(config_, strand_, j_) - : HTTPStream::makeUnique(config_, strand_, j_); + if (ssl) + stream_ = std::make_unique(config_, strand_, j_); + else + stream_ = std::make_unique(strand_); std::string error; if (!stream_->connect(error, host, port, yield)) @@ -166,6 +166,8 @@ HTTPDownloader::do_session( req.set(http::field::host, host); req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING); + std::uint64_t const rangeStart = size(p); + // Requesting a portion of the file if (rangeStart) { @@ -182,7 +184,7 @@ HTTPDownloader::do_session( // Read the response http::response_parser connectParser; connectParser.skip(true); - stream_->asyncRead(read_buf_, connectParser, false, yield, ec); + stream_->asyncRead(read_buf_, connectParser, yield, ec); if (ec) return failAndExit("async_read", p); @@ -198,7 +200,7 @@ HTTPDownloader::do_session( http::response_parser rangeParser; rangeParser.skip(true); - stream_->asyncRead(read_buf_, rangeParser, false, yield, ec); + stream_->asyncRead(read_buf_, rangeParser, yield, ec); if (ec) return failAndExit("async_read_range_verify", p); @@ -268,7 +270,7 @@ HTTPDownloader::do_session( return exit(); } - stream_->asyncRead(read_buf_, *p, true, yield, ec); + stream_->asyncReadSome(read_buf_, *p, yield, ec); } JLOG(j_.trace()) << "download completed: " << dstPath.string(); diff --git a/src/ripple/net/impl/HTTPStream.cpp b/src/ripple/net/impl/HTTPStream.cpp index d0386edb6..d23576773 100644 --- a/src/ripple/net/impl/HTTPStream.cpp +++ b/src/ripple/net/impl/HTTPStream.cpp @@ -40,8 +40,8 @@ SSLStream::getStream() bool SSLStream::connect( std::string& errorOut, - std::string const host, - std::string const port, + std::string const& host, + std::string const& port, boost::asio::yield_context& yield) { using namespace boost::asio; @@ -49,8 +49,10 @@ SSLStream::connect( boost::system::error_code ec; - auto fail = [&errorOut](std::string const& errorIn) { - errorOut = errorIn; + auto fail = [&errorOut, &ec]( + std::string const& errorIn, + std::string const& message = "") { + errorOut = errorIn + ": " + (message.empty() ? ec.message() : message); return false; }; @@ -65,7 +67,7 @@ SSLStream::connect( } catch (std::exception const& e) { - return fail(std::string("exception: ") + e.what()); + return fail("exception", e.what()); } ec = ssl_ctx_.preConnectVerify(*stream_, host); @@ -101,21 +103,23 @@ void SSLStream::asyncRead( boost::beast::flat_buffer& buf, parser& p, - bool readSome, boost::asio::yield_context& yield, boost::system::error_code& ec) { - if (readSome) - boost::beast::http::async_read_some(*stream_, buf, p, yield[ec]); - else - boost::beast::http::async_read(*stream_, buf, p, yield[ec]); + boost::beast::http::async_read(*stream_, buf, p, yield[ec]); } -RawStream::RawStream( - Config const& config, - boost::asio::io_service::strand& strand, - beast::Journal j) - : strand_(strand) +void +SSLStream::asyncReadSome( + boost::beast::flat_buffer& buf, + parser& p, + boost::asio::yield_context& yield, + boost::system::error_code& ec) +{ + boost::beast::http::async_read_some(*stream_, buf, p, yield[ec]); +} + +RawStream::RawStream(boost::asio::io_service::strand& strand) : strand_(strand) { } @@ -129,8 +133,8 @@ RawStream::getStream() bool RawStream::connect( std::string& errorOut, - std::string const host, - std::string const port, + std::string const& host, + std::string const& port, boost::asio::yield_context& yield) { using namespace boost::asio; @@ -138,8 +142,10 @@ RawStream::connect( boost::system::error_code ec; - auto fail = [&errorOut](std::string const& errorIn) { - errorOut = errorIn; + auto fail = [&errorOut, &ec]( + std::string const& errorIn, + std::string const& message = "") { + errorOut = errorIn + ": " + (message.empty() ? ec.message() : message); return false; }; @@ -154,7 +160,7 @@ RawStream::connect( } catch (std::exception const& e) { - return fail(std::string("exception: ") + e.what()); + return fail("exception", e.what()); } boost::asio::async_connect( @@ -178,14 +184,20 @@ void RawStream::asyncRead( boost::beast::flat_buffer& buf, parser& p, - bool readSome, boost::asio::yield_context& yield, boost::system::error_code& ec) { - if (readSome) - boost::beast::http::async_read_some(*stream_, buf, p, yield[ec]); - else - boost::beast::http::async_read(*stream_, buf, p, yield[ec]); + boost::beast::http::async_read(*stream_, buf, p, yield[ec]); +} + +void +RawStream::asyncReadSome( + boost::beast::flat_buffer& buf, + parser& p, + boost::asio::yield_context& yield, + boost::system::error_code& ec) +{ + boost::beast::http::async_read_some(*stream_, buf, p, yield[ec]); } } // namespace ripple diff --git a/src/ripple/nodestore/DatabaseShard.h b/src/ripple/nodestore/DatabaseShard.h index 1f3f92530..4a4f013fa 100644 --- a/src/ripple/nodestore/DatabaseShard.h +++ b/src/ripple/nodestore/DatabaseShard.h @@ -78,13 +78,13 @@ public: virtual boost::optional prepareLedger(std::uint32_t validLedgerSeq) = 0; - /** Prepare a shard index to be imported into the database + /** Prepare one or more shard indexes to be imported into the database - @param shardIndex Shard index to be prepared for import - @return true if shard index successfully prepared for import + @param shardIndexes Shard indexes to be prepared for import + @return true if all shard indexes successfully prepared for import */ virtual bool - prepareShard(std::uint32_t shardIndex) = 0; + prepareShards(std::vector const& shardIndexes) = 0; /** Remove a previously prepared shard index for import diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 71683ce8f..f05ddf9e5 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -293,62 +293,110 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq) } bool -DatabaseShardImp::prepareShard(std::uint32_t shardIndex) +DatabaseShardImp::prepareShards(std::vector const& shardIndexes) { - auto fail = [j = j_, shardIndex](std::string const& msg) { - JLOG(j.error()) << "shard " << shardIndex << " " << msg; + auto fail = [j = j_, &shardIndexes]( + std::string const& msg, + boost::optional shardIndex = boost::none) { + auto multipleIndexPrequel = [&shardIndexes] { + std::vector indexesAsString(shardIndexes.size()); + std::transform( + shardIndexes.begin(), + shardIndexes.end(), + indexesAsString.begin(), + [](uint32_t const index) { return std::to_string(index); }); + + return std::string("shard") + + (shardIndexes.size() > 1 ? "s " : " ") + + boost::algorithm::join(indexesAsString, ", "); + }; + + std::string const prequel = shardIndex + ? "shard " + std::to_string(*shardIndex) + : multipleIndexPrequel(); + + JLOG(j.error()) << prequel << " " << msg; return false; }; - if (shardIndex < earliestShardIndex()) - { - return fail( - "comes before earliest shard index " + - std::to_string(earliestShardIndex())); - } - - // If we are synced to the network, check if the shard index is - // greater or equal to the current or validated shard index. - auto seqCheck = [&](std::uint32_t ledgerSeq) { - if (ledgerSeq >= earliestLedgerSeq() && - shardIndex >= seqToShardIndex(ledgerSeq)) - { - return fail("invalid index"); - } - return true; - }; - if (!seqCheck(app_.getLedgerMaster().getValidLedgerIndex() + 1) || - !seqCheck(app_.getLedgerMaster().getCurrentLedgerIndex())) - { - return fail("invalid index"); - } - - // Any shard earlier than the two most recent shards is a historical shard - auto const isHistoricalShard{shardIndex < shardBoundaryIndex()}; - std::lock_guard lock(mutex_); assert(init_); if (!canAdd_) return fail("cannot be stored at this time"); - // Check shard count and available storage space - if (isHistoricalShard && numHistoricalShards(lock) >= maxHistoricalShards_) - return fail("maximum number of historical shards reached"); + auto historicalShardsToPrepare = 0; - if (!sufficientStorage( - 1, - isHistoricalShard ? PathDesignation::historical - : PathDesignation::none, - lock)) + for (auto const shardIndex : shardIndexes) { - return fail("insufficient storage space available"); + if (shardIndex < earliestShardIndex()) + { + return fail( + "comes before earliest shard index " + + std::to_string(earliestShardIndex()), + shardIndex); + } + + // If we are synced to the network, check if the shard index is + // greater or equal to the current or validated shard index. + auto seqCheck = [&](std::uint32_t ledgerSeq) { + if (ledgerSeq >= earliestLedgerSeq() && + shardIndex >= seqToShardIndex(ledgerSeq)) + { + return fail("invalid index", shardIndex); + } + return true; + }; + if (!seqCheck(app_.getLedgerMaster().getValidLedgerIndex() + 1) || + !seqCheck(app_.getLedgerMaster().getCurrentLedgerIndex())) + { + return fail("invalid index", shardIndex); + } + + if (shards_.find(shardIndex) != shards_.end()) + return fail("is already stored", shardIndex); + + if (preparedIndexes_.find(shardIndex) != preparedIndexes_.end()) + return fail("is already queued for import", shardIndex); + + // Any shard earlier than the two most recent shards + // is a historical shard + if (shardIndex < shardBoundaryIndex()) + ++historicalShardsToPrepare; } - if (shards_.find(shardIndex) != shards_.end()) - return fail("already stored"); - if (!preparedIndexes_.emplace(shardIndex).second) - return fail("already queued for import"); + auto const numHistShards = numHistoricalShards(lock); + + // Check shard count and available storage space + if (numHistShards + historicalShardsToPrepare > maxHistoricalShards_) + return fail("maximum number of historical shards reached"); + + if (historicalShardsToPrepare) + { + // Check available storage space for historical shards + if (!sufficientStorage( + historicalShardsToPrepare, PathDesignation::historical, lock)) + return fail("insufficient storage space available"); + } + + if (auto const recentShardsToPrepare = + shardIndexes.size() - historicalShardsToPrepare; + recentShardsToPrepare) + { + // Check available storage space for recent shards + if (!sufficientStorage( + recentShardsToPrepare, PathDesignation::none, lock)) + return fail("insufficient storage space available"); + } + + for (auto const shardIndex : shardIndexes) + { + auto const prepareSuccessful = + preparedIndexes_.emplace(shardIndex).second; + + (void)prepareSuccessful; + assert(prepareSuccessful); + } return true; } diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index e35779eda..500678e59 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -55,7 +55,7 @@ public: prepareLedger(std::uint32_t validLedgerSeq) override; bool - prepareShard(std::uint32_t shardIndex) override; + prepareShards(std::vector const& shardIndexes) override; void removePreShard(std::uint32_t shardIndex) override; diff --git a/src/ripple/rpc/impl/ShardArchiveHandler.cpp b/src/ripple/rpc/impl/ShardArchiveHandler.cpp index cabee0863..18e49c6a8 100644 --- a/src/ripple/rpc/impl/ShardArchiveHandler.cpp +++ b/src/ripple/rpc/impl/ShardArchiveHandler.cpp @@ -247,9 +247,6 @@ ShardArchiveHandler::add( if (it != archives_.end()) return url == it->second; - if (!app_.getShardStore()->prepareShard(shardIndex)) - return false; - archives_.emplace(shardIndex, std::move(url)); return true; @@ -275,6 +272,16 @@ ShardArchiveHandler::start() return false; } + std::vector shardIndexes(archives_.size()); + std::transform( + archives_.begin(), + archives_.end(), + shardIndexes.begin(), + [](auto const& entry) { return entry.first; }); + + if (!app_.getShardStore()->prepareShards(shardIndexes)) + return false; + try { // Create temp root download directory diff --git a/src/test/jtx/CaptureLogs.h b/src/test/jtx/CaptureLogs.h index 30a562e99..9a64396ab 100644 --- a/src/test/jtx/CaptureLogs.h +++ b/src/test/jtx/CaptureLogs.h @@ -30,6 +30,7 @@ namespace test { */ class CaptureLogs : public Logs { + std::mutex strmMutex_; std::stringstream strm_; std::string* pResult_; @@ -38,13 +39,17 @@ class CaptureLogs : public Logs */ class CaptureSink : public beast::Journal::Sink { + std::mutex& strmMutex_; std::stringstream& strm_; public: CaptureSink( beast::severities::Severity threshold, + std::mutex& mutex, std::stringstream& strm) - : beast::Journal::Sink(threshold, false), strm_(strm) + : beast::Journal::Sink(threshold, false) + , strmMutex_(mutex) + , strm_(strm) { } @@ -52,6 +57,7 @@ class CaptureLogs : public Logs write(beast::severities::Severity level, std::string const& text) override { + std::lock_guard lock(strmMutex_); strm_ << text; } }; @@ -72,7 +78,7 @@ public: std::string const& partition, beast::severities::Severity threshold) override { - return std::make_unique(threshold, strm_); + return std::make_unique(threshold, strmMutex_, strm_); } }; diff --git a/src/test/nodestore/DatabaseShard_test.cpp b/src/test/nodestore/DatabaseShard_test.cpp index d760abf4e..c53568e6d 100644 --- a/src/test/nodestore/DatabaseShard_test.cpp +++ b/src/test/nodestore/DatabaseShard_test.cpp @@ -647,9 +647,9 @@ class DatabaseShard_test : public TestBase } void - testPrepareShard(std::uint64_t const seedValue) + testPrepareShards(std::uint64_t const seedValue) { - testcase("Prepare shard"); + testcase("Prepare shards"); using namespace test::jtx; @@ -675,7 +675,7 @@ class DatabaseShard_test : public TestBase } else { - db->prepareShard(n); + db->prepareShards({n}); bitMask |= 1ll << n; } BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask)); @@ -683,11 +683,11 @@ class DatabaseShard_test : public TestBase // test illegal cases // adding shards with too large number - db->prepareShard(0); + db->prepareShards({0}); BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask)); - db->prepareShard(nTestShards + 1); + db->prepareShards({nTestShards + 1}); BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask)); - db->prepareShard(nTestShards + 2); + db->prepareShards({nTestShards + 2}); BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask)); // create shards which are not prepared for import @@ -753,7 +753,7 @@ class DatabaseShard_test : public TestBase if (!BEAST_EXPECT(data.makeLedgers(env))) return; - db->prepareShard(1); + db->prepareShards({1}); BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(2)); using namespace boost::filesystem; @@ -1338,7 +1338,7 @@ public: testCreateShard(seedValue); testReopenDatabase(seedValue + 10); testGetCompleteShards(seedValue + 20); - testPrepareShard(seedValue + 30); + testPrepareShards(seedValue + 30); testImportShard(seedValue + 40); testCorruptedDatabase(seedValue + 50); testIllegalFinalKey(seedValue + 60); diff --git a/src/test/rpc/ShardArchiveHandler_test.cpp b/src/test/rpc/ShardArchiveHandler_test.cpp index c22bb4b0b..98d18b2f3 100644 --- a/src/test/rpc/ShardArchiveHandler_test.cpp +++ b/src/test/rpc/ShardArchiveHandler_test.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -49,9 +50,9 @@ class ShardArchiveHandler_test : public beast::unit_test::suite } public: - // Test the shard downloading module by initiating - // and completing a download and verifying the - // contents of the state database. + // Test the shard downloading module by queueing + // a download and verifying the contents of the + // state database. void testSingleDownloadAndStateDB() { @@ -98,9 +99,9 @@ public: handler->release(); } - // Test the shard downloading module by initiating - // and completing three downloads and verifying - // the contents of the state database. + // Test the shard downloading module by queueing + // three downloads and verifying the contents of + // the state database. void testDownloadsAndStateDB() { @@ -414,6 +415,251 @@ public: BEAST_EXPECT(!boost::filesystem::exists(stateDir)); } + // Ensure that downloads fail when the shard + // database cannot store any more shards + void + testShardCountFailure() + { + testcase("testShardCountFailure"); + std::string capturedLogs; + + { + beast::temp_dir tempDir; + + auto c = jtx::envconfig(); + auto& section = c->section(ConfigSection::shardDatabase()); + section.set("path", tempDir.path()); + section.set("max_historical_shards", "1"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + auto& sectionNode = c->section(ConfigSection::nodeDatabase()); + sectionNode.set("earliest_seq", "257"); + c->setupControl(true, true, true); + + std::unique_ptr logs(new CaptureLogs(&capturedLogs)); + jtx::Env env(*this, std::move(c), std::move(logs)); + + std::uint8_t const numberOfDownloads = 10; + + // Create some ledgers so that the ShardArchiveHandler + // can verify the last ledger hash for the shard + // downloads. + for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() * + (numberOfDownloads + 1); + ++i) + { + env.close(); + } + + auto handler = env.app().getShardArchiveHandler(); + BEAST_EXPECT(handler); + BEAST_EXPECT( + dynamic_cast(handler) == nullptr); + + auto server = createServer(env); + auto host = server->local_endpoint().address().to_string(); + auto port = std::to_string(server->local_endpoint().port()); + server->stop(); + + Downloads const dl = [count = numberOfDownloads, &host, &port] { + Downloads ret; + + for (int i = 1; i <= count; ++i) + { + ret.push_back( + {i, + (boost::format("https://%s:%d/%d.tar.lz4") % host % + port % i) + .str()}); + } + + return ret; + }(); + + for (auto const& entry : dl) + { + parsedURL url; + parseUrl(url, entry.second); + handler->add(entry.first, {url, entry.second}); + } + + BEAST_EXPECT(!handler->start()); + auto stateDir = RPC::ShardArchiveHandler::getDownloadDirectory( + env.app().config()); + + handler->release(); + BEAST_EXPECT(!boost::filesystem::exists(stateDir)); + } + + auto const expectedErrorMessage = + "shards 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 maximum number of historical " + "shards reached"; + BEAST_EXPECT( + capturedLogs.find(expectedErrorMessage) != std::string::npos); + + { + beast::temp_dir tempDir; + + auto c = jtx::envconfig(); + auto& section = c->section(ConfigSection::shardDatabase()); + section.set("path", tempDir.path()); + section.set("max_historical_shards", "0"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + auto& sectionNode = c->section(ConfigSection::nodeDatabase()); + sectionNode.set("earliest_seq", "257"); + c->setupControl(true, true, true); + + std::unique_ptr logs(new CaptureLogs(&capturedLogs)); + jtx::Env env(*this, std::move(c), std::move(logs)); + + std::uint8_t const numberOfDownloads = 1; + + // Create some ledgers so that the ShardArchiveHandler + // can verify the last ledger hash for the shard + // downloads. + for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() * + ((numberOfDownloads * 3) + 1); + ++i) + { + env.close(); + } + + auto handler = env.app().getShardArchiveHandler(); + BEAST_EXPECT(handler); + BEAST_EXPECT( + dynamic_cast(handler) == nullptr); + + auto server = createServer(env); + auto host = server->local_endpoint().address().to_string(); + auto port = std::to_string(server->local_endpoint().port()); + server->stop(); + + Downloads const dl = [count = numberOfDownloads, &host, &port] { + Downloads ret; + + for (int i = 1; i <= count; ++i) + { + ret.push_back( + {i, + (boost::format("https://%s:%d/%d.tar.lz4") % host % + port % i) + .str()}); + } + + return ret; + }(); + + for (auto const& entry : dl) + { + parsedURL url; + parseUrl(url, entry.second); + handler->add(entry.first, {url, entry.second}); + } + + BEAST_EXPECT(!handler->start()); + auto stateDir = RPC::ShardArchiveHandler::getDownloadDirectory( + env.app().config()); + + handler->release(); + BEAST_EXPECT(!boost::filesystem::exists(stateDir)); + } + + auto const expectedErrorMessage2 = + "shard 1 maximum number of historical shards reached"; + BEAST_EXPECT( + capturedLogs.find(expectedErrorMessage2) != std::string::npos); + } + + // Ensure that downloads fail when the shard + // database has already stored one of the + // queued shards + void + testRedundantShardFailure() + { + testcase("testRedundantShardFailure"); + std::string capturedLogs; + + { + beast::temp_dir tempDir; + + auto c = jtx::envconfig(); + auto& section = c->section(ConfigSection::shardDatabase()); + section.set("path", tempDir.path()); + section.set("max_historical_shards", "1"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + auto& sectionNode = c->section(ConfigSection::nodeDatabase()); + sectionNode.set("earliest_seq", "257"); + c->setupControl(true, true, true); + + std::unique_ptr logs(new CaptureLogs(&capturedLogs)); + jtx::Env env( + *this, + std::move(c), + std::move(logs), + beast::severities::kDebug); + + std::uint8_t const numberOfDownloads = 10; + + // Create some ledgers so that the ShardArchiveHandler + // can verify the last ledger hash for the shard + // downloads. + for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() * + (numberOfDownloads + 1); + ++i) + { + env.close(); + } + + env.app().getShardStore()->prepareShards({1}); + + auto handler = env.app().getShardArchiveHandler(); + BEAST_EXPECT(handler); + BEAST_EXPECT( + dynamic_cast(handler) == nullptr); + + auto server = createServer(env); + auto host = server->local_endpoint().address().to_string(); + auto port = std::to_string(server->local_endpoint().port()); + server->stop(); + + Downloads const dl = [count = numberOfDownloads, &host, &port] { + Downloads ret; + + for (int i = 1; i <= count; ++i) + { + ret.push_back( + {i, + (boost::format("https://%s:%d/%d.tar.lz4") % host % + port % i) + .str()}); + } + + return ret; + }(); + + for (auto const& entry : dl) + { + parsedURL url; + parseUrl(url, entry.second); + handler->add(entry.first, {url, entry.second}); + } + + BEAST_EXPECT(!handler->start()); + auto stateDir = RPC::ShardArchiveHandler::getDownloadDirectory( + env.app().config()); + + handler->release(); + BEAST_EXPECT(!boost::filesystem::exists(stateDir)); + } + + auto const expectedErrorMessage = + "shard 1 is already queued for import"; + BEAST_EXPECT( + capturedLogs.find(expectedErrorMessage) != std::string::npos); + } + void run() override { @@ -421,6 +667,8 @@ public: testDownloadsAndStateDB(); testDownloadsAndFileSystem(); testDownloadsAndRestart(); + testShardCountFailure(); + testRedundantShardFailure(); } };