From 08371ba2c4b55541fc8764b75131660632b43876 Mon Sep 17 00:00:00 2001 From: Miguel Portilla Date: Thu, 10 Jan 2019 17:32:08 -0500 Subject: [PATCH] Improve shard downloader status reporting --- src/ripple/app/main/Application.cpp | 2 +- src/ripple/net/SSLHTTPDownloader.h | 7 + src/ripple/net/impl/SSLHTTPDownloader.cpp | 97 ++++++---- src/ripple/nodestore/DatabaseShard.h | 6 +- src/ripple/nodestore/impl/Database.cpp | 12 +- .../nodestore/impl/DatabaseShardImp.cpp | 81 ++++---- src/ripple/nodestore/impl/DatabaseShardImp.h | 4 +- src/ripple/overlay/impl/PeerImp.cpp | 6 +- src/ripple/rpc/ShardArchiveHandler.h | 38 ++-- src/ripple/rpc/handlers/DownloadShard.cpp | 23 ++- src/ripple/rpc/impl/ServerHandlerImp.cpp | 4 +- src/ripple/rpc/impl/ShardArchiveHandler.cpp | 182 +++++++++++------- 12 files changed, 270 insertions(+), 192 deletions(-) diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 77ae7dcc95..6cd39bcd4a 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -225,7 +225,7 @@ public: reset () override { { - std::lock_guard l(maxSeqLock); + std::lock_guard lock(maxSeqLock); maxSeq = 0; } fullbelow_.reset(); diff --git a/src/ripple/net/SSLHTTPDownloader.h b/src/ripple/net/SSLHTTPDownloader.h index fb9c49abfc..54aa868b2c 100644 --- a/src/ripple/net/SSLHTTPDownloader.h +++ b/src/ripple/net/SSLHTTPDownloader.h @@ -80,6 +80,13 @@ private: boost::filesystem::path dstPath, std::function complete, boost::asio::yield_context yield); + + void + fail( + boost::filesystem::path dstPath, + std::function const& complete, + boost::system::error_code const& ec, + std::string const& errMsg); }; } // ripple diff --git a/src/ripple/net/impl/SSLHTTPDownloader.cpp b/src/ripple/net/impl/SSLHTTPDownloader.cpp index 8d1c7238e9..092c8325d9 100644 --- a/src/ripple/net/impl/SSLHTTPDownloader.cpp +++ b/src/ripple/net/impl/SSLHTTPDownloader.cpp @@ -132,31 +132,21 @@ SSLHTTPDownloader::do_session( using namespace boost::beast; boost::system::error_code ec; - auto fail = [&](std::string errMsg) - { - if (ec != boost::asio::error::operation_aborted) - { - JLOG(j_.error()) << - errMsg << ": " << ec.message(); - } - try - { - remove(dstPath); - } - catch (std::exception const& e) - { - JLOG(j_.error()) << - "exception: " << e.what(); - } - complete(std::move(dstPath)); - }; - ip::tcp::resolver resolver {strand_.get_io_context()}; auto const results = resolver.async_resolve(host, port, yield[ec]); if (ec) - return fail("async_resolve"); + return fail(dstPath, complete, ec, "async_resolve"); + + try + { + stream_.emplace(strand_.get_io_service(), ctx_); + } + catch (std::exception const& e) + { + return fail(dstPath, complete, ec, + std::string("exception: ") + e.what()); + } - stream_.emplace(strand_.get_io_service(), ctx_); if (ssl_verify_) { // If we intend to verify the SSL connection, we need to set the @@ -165,36 +155,36 @@ SSLHTTPDownloader::do_session( { ec.assign(static_cast( ::ERR_get_error()), boost::asio::error::get_ssl_category()); - return fail("SSL_set_tlsext_host_name"); + return fail(dstPath, complete, ec, "SSL_set_tlsext_host_name"); } } else { stream_->set_verify_mode(boost::asio::ssl::verify_none, ec); if (ec) - return fail("set_verify_mode"); + return fail(dstPath, complete, ec, "set_verify_mode"); } boost::asio::async_connect( stream_->next_layer(), results.begin(), results.end(), yield[ec]); if (ec) - return fail("async_connect"); + return fail(dstPath, complete, ec, "async_connect"); if (ssl_verify_) { stream_->set_verify_mode(boost::asio::ssl::verify_peer, ec); if (ec) - return fail("set_verify_mode"); + return fail(dstPath, complete, ec, "set_verify_mode"); stream_->set_verify_callback( boost::asio::ssl::rfc2818_verification(host.c_str()), ec); if (ec) - return fail("set_verify_callback"); + return fail(dstPath, complete, ec, "set_verify_callback"); } stream_->async_handshake(ssl::stream_base::client, yield[ec]); if (ec) - return fail("async_handshake"); + return fail(dstPath, complete, ec, "async_handshake"); // Set up an HTTP HEAD request message to find the file size http::request req {http::verb::head, target, version}; @@ -203,7 +193,7 @@ SSLHTTPDownloader::do_session( http::async_write(*stream_, req, yield[ec]); if(ec) - return fail("async_write"); + return fail(dstPath, complete, ec, "async_write"); { // Check if available storage for file size @@ -211,19 +201,21 @@ SSLHTTPDownloader::do_session( p.skip(true); http::async_read(*stream_, read_buf_, p, yield[ec]); if(ec) - return fail("async_read"); + return fail(dstPath, complete, ec, "async_read"); if (auto len = p.content_length()) { try { if (*len > space(dstPath.parent_path()).available) - return fail("Insufficient disk space for download"); + { + return fail(dstPath, complete, ec, + "Insufficient disk space for download"); + } } catch (std::exception const& e) { - JLOG(j_.error()) << - "exception: " << e.what(); - return fail({}); + return fail(dstPath, complete, ec, + std::string("exception: ") + e.what()); } } } @@ -232,7 +224,7 @@ SSLHTTPDownloader::do_session( req.method(http::verb::get); http::async_write(*stream_, req, yield[ec]); if(ec) - return fail("async_write"); + return fail(dstPath, complete, ec, "async_write"); // Download the file http::response_parser p; @@ -244,14 +236,14 @@ SSLHTTPDownloader::do_session( if (ec) { p.get().body().close(); - return fail("open"); + return fail(dstPath, complete, ec, "open"); } http::async_read(*stream_, read_buf_, p, yield[ec]); if (ec) { p.get().body().close(); - return fail("async_read"); + return fail(dstPath, complete, ec, "async_read"); } p.get().body().close(); @@ -266,6 +258,7 @@ SSLHTTPDownloader::do_session( JLOG(j_.trace()) << "async_shutdown: " << ec.message(); } + // The socket cannot be reused stream_ = boost::none; JLOG(j_.trace()) << @@ -275,4 +268,36 @@ SSLHTTPDownloader::do_session( complete(std::move(dstPath)); } +void +SSLHTTPDownloader::fail( + boost::filesystem::path dstPath, + std::function const& complete, + boost::system::error_code const& ec, + std::string const& errMsg) +{ + if (!ec) + { + JLOG(j_.error()) << + errMsg; + } + else if (ec != boost::asio::error::operation_aborted) + { + JLOG(j_.error()) << + errMsg << ": " << ec.message(); + } + + try + { + remove(dstPath); + } + catch (std::exception const& e) + { + JLOG(j_.error()) << + "exception: " << e.what(); + } + complete(std::move(dstPath)); +} + + + }// ripple diff --git a/src/ripple/nodestore/DatabaseShard.h b/src/ripple/nodestore/DatabaseShard.h index c43f9ccc55..0af67d7215 100644 --- a/src/ripple/nodestore/DatabaseShard.h +++ b/src/ripple/nodestore/DatabaseShard.h @@ -99,11 +99,11 @@ public: /** Get shard indexes being imported - @return The number of shards prepared for import + @return a string representing the shards prepared for import */ virtual - std::uint32_t - getNumPreShard() = 0; + std::string + getPreShards() = 0; /** Import a shard into the shard database diff --git a/src/ripple/nodestore/impl/Database.cpp b/src/ripple/nodestore/impl/Database.cpp index d172a5bcd0..2d7bdced46 100644 --- a/src/ripple/nodestore/impl/Database.cpp +++ b/src/ripple/nodestore/impl/Database.cpp @@ -63,7 +63,7 @@ Database::~Database() void Database::waitReads() { - std::unique_lock l(readLock_); + std::unique_lock lock(readLock_); // Wake in two generations. // Each generation is a full pass over the space. // If we're in generation N and you issue a request, @@ -75,7 +75,7 @@ Database::waitReads() // you know the request is done. std::uint64_t const wakeGen = readGen_ + 2; while (! readShut_ && ! read_.empty() && (readGen_ < wakeGen)) - readGenCondVar_.wait(l); + readGenCondVar_.wait(lock); } void @@ -91,7 +91,7 @@ void Database::stopThreads() { { - std::lock_guard l(readLock_); + std::lock_guard lock(readLock_); if (readShut_) // Only stop threads once. return; @@ -110,7 +110,7 @@ Database::asyncFetch(uint256 const& hash, std::uint32_t seq, std::shared_ptr> const& nCache) { // Post a read - std::lock_guard l(readLock_); + std::lock_guard lock(readLock_); if (read_.emplace(hash, std::make_tuple(seq, pCache, nCache)).second) readCondVar_.notify_one(); } @@ -358,12 +358,12 @@ Database::threadEntry() std::shared_ptr> lastPcache; std::shared_ptr> lastNcache; { - std::unique_lock l(readLock_); + std::unique_lock lock(readLock_); while (! readShut_ && read_.empty()) { // All work is done readGenCondVar_.notify_all(); - readCondVar_.wait(l); + readCondVar_.wait(lock); } if (readShut_) break; diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 3667b7cdf8..033b09602e 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -64,7 +64,7 @@ bool DatabaseShardImp::init() { using namespace boost::filesystem; - std::lock_guard l(m_); + std::lock_guard lock(m_); if (init_) { assert(false); @@ -173,7 +173,7 @@ DatabaseShardImp::init() std::max(1, maxDiskSpace_ / avgShardSz_)); } else - updateStats(l); + updateStats(lock); init_ = true; return true; } @@ -181,7 +181,7 @@ DatabaseShardImp::init() boost::optional DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq) { - std::lock_guard l(m_); + std::lock_guard lock(m_); assert(init_); if (incomplete_) return incomplete_->prepare(); @@ -206,7 +206,7 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq) } } - auto const shardIndex {findShardIndexToAdd(validLedgerSeq, l)}; + auto const shardIndex {findShardIndexToAdd(validLedgerSeq, lock)}; if (!shardIndex) { JLOG(j_.debug()) << @@ -232,7 +232,7 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq) bool DatabaseShardImp::prepareShard(std::uint32_t shardIndex) { - std::lock_guard l(m_); + std::lock_guard lock(m_); assert(init_); if (!canAdd_) { @@ -316,18 +316,25 @@ DatabaseShardImp::prepareShard(std::uint32_t shardIndex) void DatabaseShardImp::removePreShard(std::uint32_t shardIndex) { - std::lock_guard l(m_); + std::lock_guard lock(m_); assert(init_); preShards_.erase(shardIndex); } -std::uint32_t -DatabaseShardImp::getNumPreShard() +std::string +DatabaseShardImp::getPreShards() { - std::lock_guard l(m_); - assert(init_); - return preShards_.size(); -} + RangeSet rs; + { + std::lock_guard lock(m_); + assert(init_); + if (preShards_.empty()) + return {}; + for (auto const& ps : preShards_) + rs.insert(ps.first); + } + return to_string(rs); +}; bool DatabaseShardImp::importShard(std::uint32_t shardIndex, @@ -365,7 +372,7 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex, return true; }; - std::unique_lock l(m_); + std::unique_lock lock(m_); assert(init_); // Check shard is prepared @@ -414,9 +421,9 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex, // Shard validation requires releasing the lock // so the database can fetch data from it it->second = shard.get(); - l.unlock(); + lock.unlock(); auto const valid {shard->validate(app_)}; - l.lock(); + lock.lock(); if (!valid) { it = preShards_.find(shardIndex); @@ -491,7 +498,7 @@ DatabaseShardImp::setStored(std::shared_ptr const& ledger) return; } auto const shardIndex {seqToShardIndex(ledger->info().seq)}; - std::lock_guard l(m_); + std::lock_guard lock(m_); assert(init_); if (!incomplete_ || shardIndex != incomplete_->index()) { @@ -514,7 +521,7 @@ DatabaseShardImp::setStored(std::shared_ptr const& ledger) { complete_.emplace(incomplete_->index(), std::move(incomplete_)); incomplete_.reset(); - updateStats(l); + updateStats(lock); // Update peers with new shard index protocol::TMShardInfo message; @@ -530,7 +537,7 @@ bool DatabaseShardImp::contains(std::uint32_t seq) { auto const shardIndex {seqToShardIndex(seq)}; - std::lock_guard l(m_); + std::lock_guard lock(m_); assert(init_); if (complete_.find(shardIndex) != complete_.end()) return true; @@ -542,7 +549,7 @@ DatabaseShardImp::contains(std::uint32_t seq) std::string DatabaseShardImp::getCompleteShards() { - std::lock_guard l(m_); + std::lock_guard lock(m_); assert(init_); return status_; } @@ -551,7 +558,7 @@ void DatabaseShardImp::validate() { { - std::lock_guard l(m_); + std::lock_guard lock(m_); assert(init_); if (complete_.empty() && !incomplete_) { @@ -586,7 +593,7 @@ DatabaseShardImp::validate() void DatabaseShardImp::import(Database& source) { - std::unique_lock l(m_); + std::unique_lock lock(m_); assert(init_); // Only the application local node store can be imported @@ -772,7 +779,7 @@ DatabaseShardImp::import(Database& source) complete_.clear(); incomplete_.reset(); usedDiskSpace_ = 0; - l.unlock(); + lock.unlock(); if (!init()) Throw("Failed to initialize"); @@ -783,7 +790,7 @@ DatabaseShardImp::getWriteLoad() const { std::int32_t wl {0}; { - std::lock_guard l(m_); + std::lock_guard lock(m_); assert(init_); for (auto const& c : complete_) wl += c.second->getBackend()->getWriteLoad(); @@ -803,7 +810,7 @@ DatabaseShardImp::store(NodeObjectType type, std::shared_ptr nObj; auto const shardIndex {seqToShardIndex(seq)}; { - std::lock_guard l(m_); + std::lock_guard lock(m_); assert(init_); if (!incomplete_ || shardIndex != incomplete_->index()) { @@ -851,7 +858,7 @@ bool DatabaseShardImp::copyLedger(std::shared_ptr const& ledger) { auto const shardIndex {seqToShardIndex(ledger->info().seq)}; - std::lock_guard l(m_); + std::lock_guard lock(m_); assert(init_); if (!incomplete_ || shardIndex != incomplete_->index()) { @@ -881,7 +888,7 @@ DatabaseShardImp::copyLedger(std::shared_ptr const& ledger) { complete_.emplace(incomplete_->index(), std::move(incomplete_)); incomplete_.reset(); - updateStats(l); + updateStats(lock); } return true; } @@ -891,7 +898,7 @@ DatabaseShardImp::getDesiredAsyncReadCount(std::uint32_t seq) { auto const shardIndex {seqToShardIndex(seq)}; { - std::lock_guard l(m_); + std::lock_guard lock(m_); assert(init_); auto it = complete_.find(shardIndex); if (it != complete_.end()) @@ -907,7 +914,7 @@ DatabaseShardImp::getCacheHitRate() { float sz, f {0}; { - std::lock_guard l(m_); + std::lock_guard lock(m_); assert(init_); sz = complete_.size(); for (auto const& c : complete_) @@ -924,11 +931,11 @@ DatabaseShardImp::getCacheHitRate() void DatabaseShardImp::tune(int size, std::chrono::seconds age) { - std::lock_guard l(m_); + std::lock_guard lock(m_); assert(init_); cacheSz_ = size; cacheAge_ = age; - int const sz {calcTargetCacheSz(l)}; + int const sz {calcTargetCacheSz(lock)}; for (auto const& c : complete_) { c.second->pCache()->setTargetSize(sz); @@ -948,9 +955,9 @@ DatabaseShardImp::tune(int size, std::chrono::seconds age) void DatabaseShardImp::sweep() { - std::lock_guard l(m_); + std::lock_guard lock(m_); assert(init_); - int const sz {calcTargetCacheSz(l)}; + int const sz {calcTargetCacheSz(lock)}; for (auto const& c : complete_) { c.second->pCache()->sweep(); @@ -971,19 +978,19 @@ std::shared_ptr DatabaseShardImp::fetchFrom(uint256 const& hash, std::uint32_t seq) { auto const shardIndex {seqToShardIndex(seq)}; - std::unique_lock l(m_); + std::unique_lock lock(m_); assert(init_); { auto it = complete_.find(shardIndex); if (it != complete_.end()) { - l.unlock(); + lock.unlock(); return fetchInternal(hash, *it->second->getBackend()); } } if (incomplete_ && incomplete_->index() == shardIndex) { - l.unlock(); + lock.unlock(); return fetchInternal(hash, *incomplete_->getBackend()); } @@ -991,7 +998,7 @@ DatabaseShardImp::fetchFrom(uint256 const& hash, std::uint32_t seq) auto it = preShards_.find(shardIndex); if (it != preShards_.end() && it->second) { - l.unlock(); + lock.unlock(); return fetchInternal(hash, *it->second->getBackend()); } return {}; @@ -1109,7 +1116,7 @@ std::pair, std::shared_ptr> DatabaseShardImp::selectCache(std::uint32_t seq) { auto const shardIndex {seqToShardIndex(seq)}; - std::lock_guard l(m_); + std::lock_guard lock(m_); assert(init_); { auto it = complete_.find(shardIndex); diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index e6246843ae..535bdcab0a 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -51,8 +51,8 @@ public: void removePreShard(std::uint32_t shardIndex) override; - std::uint32_t - getNumPreShard() override; + std::string + getPreShards() override; bool importShard(std::uint32_t shardIndex, diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index ab48bdfd89..d561521d67 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -359,10 +359,8 @@ PeerImp::hasLedger (uint256 const& hash, std::uint32_t seq) const return true; } - if (seq >= app_.getNodeStore().earliestSeq()) - return hasShard( - (seq - 1) / NodeStore::DatabaseShard::ledgersPerShardDefault); - return false; + return seq >= app_.getNodeStore().earliestSeq() && + hasShard(NodeStore::seqToShardIndex(seq)); } void diff --git a/src/ripple/rpc/ShardArchiveHandler.h b/src/ripple/rpc/ShardArchiveHandler.h index 45c0b4556b..22fef13e5f 100644 --- a/src/ripple/rpc/ShardArchiveHandler.h +++ b/src/ripple/rpc/ShardArchiveHandler.h @@ -46,48 +46,44 @@ public: ~ShardArchiveHandler(); - /** Initializes the handler. - @return `true` if successfully initialized. - */ - bool - init(); - - /** Queue an archive to be downloaded and imported. + /** Add an archive to be downloaded and imported. @param shardIndex the index of the shard to be imported. @param url the location of the archive. @return `true` if successfully added. + @note Returns false if called while downloading. */ bool add(std::uint32_t shardIndex, parsedURL&& url); - /** Starts downloading and importing of queued archives. */ - void - next(); - - /** Returns indexes of queued archives. - @return indexes of queued archives. - */ - std::string - toString() const; + /** Starts downloading and importing archives. */ + bool + start(); private: - // The callback used by the downloader to notify completion of a download. + // Begins the download and import of the next archive. + bool + next(std::lock_guard& l); + + // Callback used by the downloader to notify completion of a download. void complete(boost::filesystem::path dstPath); - // A job to extract an archive and import a shard. + // Extract a downloaded archive and import it into the shard store. void process(boost::filesystem::path const& dstPath); + // Remove the archive being processed. void - remove(std::uint32_t shardIndex); + remove(std::lock_guard&); + std::mutex mutable m_; Application& app_; std::shared_ptr downloader_; - std::map archives_; - bool const validate_; boost::filesystem::path const downloadDir_; + bool const validate_; boost::asio::basic_waitable_timer timer_; + bool process_; + std::map archives_; beast::Journal j_; }; diff --git a/src/ripple/rpc/handlers/DownloadShard.cpp b/src/ripple/rpc/handlers/DownloadShard.cpp index 883683260e..456823785a 100644 --- a/src/ripple/rpc/handlers/DownloadShard.cpp +++ b/src/ripple/rpc/handlers/DownloadShard.cpp @@ -57,9 +57,15 @@ doDownloadShard(RPC::Context& context) if (!shardStore) return rpcError(rpcNOT_ENABLED); - // Deny request if already downloading - if (shardStore->getNumPreShard()) - return rpcError(rpcTOO_BUSY); + // Return status update if already downloading + auto preShards {shardStore->getPreShards()}; + if (!preShards.empty()) + { + std::string s {"Download in progress. Shard"}; + if (!std::all_of(preShards.begin(), preShards.end(), ::isdigit)) + s += "s"; + return RPC::makeObjectValue(s + " " + preShards); + } if (!context.params.isMember(jss::shards)) return RPC::missing_field_error(jss::shards); @@ -132,8 +138,6 @@ doDownloadShard(RPC::Context& context) // Begin downloading. The handler keeps itself alive while downloading. auto handler { std::make_shared(context.app, validate)}; - if (!handler->init()) - return rpcError(rpcINTERNAL); for (auto& ar : archives) { if (!handler->add(ar.first, std::move(ar.second))) @@ -143,9 +147,14 @@ doDownloadShard(RPC::Context& context) std::to_string(ar.first) + " exists or being acquired"); } } - handler->next(); + if (!handler->start()) + return rpcError(rpcINTERNAL); - return RPC::makeObjectValue("downloading shards " + handler->toString()); + std::string s {"Downloading shard"}; + preShards = shardStore->getPreShards(); + if (!std::all_of(preShards.begin(), preShards.end(), ::isdigit)) + s += "s"; + return RPC::makeObjectValue(s + " " + preShards); } } // ripple diff --git a/src/ripple/rpc/impl/ServerHandlerImp.cpp b/src/ripple/rpc/impl/ServerHandlerImp.cpp index 12f0101361..0a1966358b 100644 --- a/src/ripple/rpc/impl/ServerHandlerImp.cpp +++ b/src/ripple/rpc/impl/ServerHandlerImp.cpp @@ -152,7 +152,7 @@ bool ServerHandlerImp::onAccept (Session& session, boost::asio::ip::tcp::endpoint endpoint) { - std::lock_guard l(countlock_); + std::lock_guard lock(countlock_); auto const c = ++count_[session.port()]; @@ -363,7 +363,7 @@ void ServerHandlerImp::onClose (Session& session, boost::system::error_code const&) { - std::lock_guard l(countlock_); + std::lock_guard lock(countlock_); --count_[session.port()]; } diff --git a/src/ripple/rpc/impl/ShardArchiveHandler.cpp b/src/ripple/rpc/impl/ShardArchiveHandler.cpp index b45a7064a8..c776a80bdc 100644 --- a/src/ripple/rpc/impl/ShardArchiveHandler.cpp +++ b/src/ripple/rpc/impl/ShardArchiveHandler.cpp @@ -33,10 +33,11 @@ using namespace std::chrono_literals; ShardArchiveHandler::ShardArchiveHandler(Application& app, bool validate) : app_(app) - , validate_(validate) , downloadDir_(get(app_.config().section( ConfigSection::shardDatabase()), "path", "") + "/download") + , validate_(validate) , timer_(app_.getIOService()) + , process_(false) , j_(app.journal("ShardArchiveHandler")) { assert(app_.getShardStore()); @@ -44,9 +45,11 @@ ShardArchiveHandler::ShardArchiveHandler(Application& app, bool validate) ShardArchiveHandler::~ShardArchiveHandler() { + std::lock_guard lock(m_); timer_.cancel(); for (auto const& ar : archives_) app_.getShardStore()->removePreShard(ar.first); + archives_.clear(); // Remove temp root download directory try @@ -61,18 +64,45 @@ ShardArchiveHandler::~ShardArchiveHandler() } bool -ShardArchiveHandler::init() +ShardArchiveHandler::add(std::uint32_t shardIndex, parsedURL&& url) { + std::lock_guard lock(m_); + if (process_) + { + JLOG(j_.error()) << + "Download and import already in progress"; + return false; + } + + auto const it {archives_.find(shardIndex)}; + if (it != archives_.end()) + return url == it->second; + if (!app_.getShardStore()->prepareShard(shardIndex)) + return false; + archives_.emplace(shardIndex, std::move(url)); + return true; +} + +bool +ShardArchiveHandler::start() +{ + std::lock_guard lock(m_); if (!app_.getShardStore()) { JLOG(j_.error()) << "No shard store available"; return false; } - if (downloader_) + if (process_) { - JLOG(j_.error()) << - "Already initialized"; + JLOG(j_.warn()) << + "Archives already being processed"; + return false; + } + if (archives_.empty()) + { + JLOG(j_.warn()) << + "No archives to process"; return false; } @@ -91,36 +121,31 @@ ShardArchiveHandler::init() return false; } - downloader_ = std::make_shared( - app_.getIOService(), j_); - return downloader_->init(app_.config()); + if (!downloader_) + { + downloader_ = std::make_shared( + app_.getIOService(), j_); + if (!downloader_->init(app_.config())) + { + downloader_.reset(); + return false; + } + } + return next(lock); } bool -ShardArchiveHandler::add(std::uint32_t shardIndex, parsedURL&& url) +ShardArchiveHandler::next(std::lock_guard& l) { - assert(downloader_); - auto const it {archives_.find(shardIndex)}; - if (it != archives_.end()) - return url == it->second; - if (!app_.getShardStore()->prepareShard(shardIndex)) - return false; - archives_.emplace(shardIndex, std::move(url)); - return true; -} - -void -ShardArchiveHandler::next() -{ - assert(downloader_); - - // Check if all archives completed if (archives_.empty()) - return; + { + process_ = false; + return false; + } // Create a temp archive directory at the root - auto const dstDir { - downloadDir_ / std::to_string(archives_.begin()->first)}; + auto const shardIndex {archives_.begin()->first}; + auto const dstDir {downloadDir_ / std::to_string(shardIndex)}; try { create_directory(dstDir); @@ -129,53 +154,55 @@ ShardArchiveHandler::next() { JLOG(j_.error()) << "exception: " << e.what(); - remove(archives_.begin()->first); - return next(); + remove(l); + return next(l); } // Download the archive auto const& url {archives_.begin()->second}; - downloader_->download( + if (!downloader_->download( url.domain, std::to_string(url.port.get_value_or(443)), url.path, 11, dstDir / "archive.tar.lz4", std::bind(&ShardArchiveHandler::complete, - shared_from_this(), std::placeholders::_1)); -} + shared_from_this(), std::placeholders::_1))) + { + remove(l); + return next(l); + } -std::string -ShardArchiveHandler::toString() const -{ - assert(downloader_); - RangeSet rs; - for (auto const& ar : archives_) - rs.insert(ar.first); - return to_string(rs); -}; + process_ = true; + return true; +} void ShardArchiveHandler::complete(path dstPath) { - try { - if (!is_regular_file(dstPath)) + std::lock_guard lock(m_); + try { - auto ar {archives_.begin()}; - JLOG(j_.error()) << - "Downloading shard id " << ar->first << - " URL " << ar->second.domain << ar->second.path; - remove(ar->first); - return next(); + if (!is_regular_file(dstPath)) + { + auto ar {archives_.begin()}; + JLOG(j_.error()) << + "Downloading shard id " << ar->first << + " form URL " << ar->second.domain << ar->second.path; + remove(lock); + next(lock); + return; + } + } + catch (std::exception const& e) + { + JLOG(j_.error()) << + "exception: " << e.what(); + remove(lock); + next(lock); + return; } - } - catch (std::exception const& e) - { - JLOG(j_.error()) << - "exception: " << e.what(); - remove(archives_.begin()->first); - return next(); } // Process in another thread to not hold up the IO service @@ -187,8 +214,9 @@ ShardArchiveHandler::complete(path dstPath) auto const mode {ptr->app_.getOPs().getOperatingMode()}; if (ptr->validate_ && mode != NetworkOPs::omFULL) { - timer_.expires_from_now(std::chrono::seconds{ - (NetworkOPs::omFULL - mode) * 10}); + std::lock_guard lock(m_); + timer_.expires_from_now(static_cast( + (NetworkOPs::omFULL - mode) * 10)); timer_.async_wait( [=, dstPath = std::move(dstPath), ptr = std::move(ptr)] (boost::system::error_code const& ec) @@ -196,21 +224,30 @@ ShardArchiveHandler::complete(path dstPath) if (ec != boost::asio::error::operation_aborted) ptr->complete(std::move(dstPath)); }); - return; } - ptr->process(dstPath); - ptr->next(); + else + { + ptr->process(dstPath); + std::lock_guard lock(m_); + remove(lock); + next(lock); + } }); } void ShardArchiveHandler::process(path const& dstPath) { - auto const shardIndex {archives_.begin()->first}; + std::uint32_t shardIndex; + { + std::lock_guard lock(m_); + shardIndex = archives_.begin()->first; + } + auto const shardDir {dstPath.parent_path() / std::to_string(shardIndex)}; try { - // Decompress and extract the downloaded file + // Extract the downloaded archive extractTarLz4(dstPath, dstPath.parent_path()); // The extracted root directory name must match the shard index @@ -219,14 +256,14 @@ ShardArchiveHandler::process(path const& dstPath) JLOG(j_.error()) << "Shard " << shardIndex << " mismatches archive shard directory"; - return remove(shardIndex); + return; } } catch (std::exception const& e) { JLOG(j_.error()) << "exception: " << e.what(); - return remove(shardIndex); + return; } // Import the shard into the shard store @@ -234,18 +271,17 @@ ShardArchiveHandler::process(path const& dstPath) { JLOG(j_.error()) << "Importing shard " << shardIndex; + return; } - else - { - JLOG(j_.debug()) << - "Shard " << shardIndex << " downloaded and imported"; - } - remove(shardIndex); + + JLOG(j_.debug()) << + "Shard " << shardIndex << " downloaded and imported"; } void -ShardArchiveHandler::remove(std::uint32_t shardIndex) +ShardArchiveHandler::remove(std::lock_guard&) { + auto const shardIndex {archives_.begin()->first}; app_.getShardStore()->removePreShard(shardIndex); archives_.erase(shardIndex);