Improve shard downloader status reporting

This commit is contained in:
Miguel Portilla
2019-01-10 17:32:08 -05:00
committed by Nik Bougalis
parent 56bc2a2ade
commit 08371ba2c4
12 changed files with 270 additions and 192 deletions

View File

@@ -225,7 +225,7 @@ public:
reset () override reset () override
{ {
{ {
std::lock_guard<std::mutex> l(maxSeqLock); std::lock_guard<std::mutex> lock(maxSeqLock);
maxSeq = 0; maxSeq = 0;
} }
fullbelow_.reset(); fullbelow_.reset();

View File

@@ -80,6 +80,13 @@ private:
boost::filesystem::path dstPath, boost::filesystem::path dstPath,
std::function<void(boost::filesystem::path)> complete, std::function<void(boost::filesystem::path)> complete,
boost::asio::yield_context yield); boost::asio::yield_context yield);
void
fail(
boost::filesystem::path dstPath,
std::function<void(boost::filesystem::path)> const& complete,
boost::system::error_code const& ec,
std::string const& errMsg);
}; };
} // ripple } // ripple

View File

@@ -132,31 +132,21 @@ SSLHTTPDownloader::do_session(
using namespace boost::beast; using namespace boost::beast;
boost::system::error_code ec; boost::system::error_code ec;
auto fail = [&](std::string errMsg)
{
if (ec != boost::asio::error::operation_aborted)
{
JLOG(j_.error()) <<
errMsg << ": " << ec.message();
}
try
{
remove(dstPath);
}
catch (std::exception const& e)
{
JLOG(j_.error()) <<
"exception: " << e.what();
}
complete(std::move(dstPath));
};
ip::tcp::resolver resolver {strand_.get_io_context()}; ip::tcp::resolver resolver {strand_.get_io_context()};
auto const results = resolver.async_resolve(host, port, yield[ec]); auto const results = resolver.async_resolve(host, port, yield[ec]);
if (ec) if (ec)
return fail("async_resolve"); return fail(dstPath, complete, ec, "async_resolve");
try
{
stream_.emplace(strand_.get_io_service(), ctx_);
}
catch (std::exception const& e)
{
return fail(dstPath, complete, ec,
std::string("exception: ") + e.what());
}
stream_.emplace(strand_.get_io_service(), ctx_);
if (ssl_verify_) if (ssl_verify_)
{ {
// If we intend to verify the SSL connection, we need to set the // If we intend to verify the SSL connection, we need to set the
@@ -165,36 +155,36 @@ SSLHTTPDownloader::do_session(
{ {
ec.assign(static_cast<int>( ec.assign(static_cast<int>(
::ERR_get_error()), boost::asio::error::get_ssl_category()); ::ERR_get_error()), boost::asio::error::get_ssl_category());
return fail("SSL_set_tlsext_host_name"); return fail(dstPath, complete, ec, "SSL_set_tlsext_host_name");
} }
} }
else else
{ {
stream_->set_verify_mode(boost::asio::ssl::verify_none, ec); stream_->set_verify_mode(boost::asio::ssl::verify_none, ec);
if (ec) if (ec)
return fail("set_verify_mode"); return fail(dstPath, complete, ec, "set_verify_mode");
} }
boost::asio::async_connect( boost::asio::async_connect(
stream_->next_layer(), results.begin(), results.end(), yield[ec]); stream_->next_layer(), results.begin(), results.end(), yield[ec]);
if (ec) if (ec)
return fail("async_connect"); return fail(dstPath, complete, ec, "async_connect");
if (ssl_verify_) if (ssl_verify_)
{ {
stream_->set_verify_mode(boost::asio::ssl::verify_peer, ec); stream_->set_verify_mode(boost::asio::ssl::verify_peer, ec);
if (ec) if (ec)
return fail("set_verify_mode"); return fail(dstPath, complete, ec, "set_verify_mode");
stream_->set_verify_callback( stream_->set_verify_callback(
boost::asio::ssl::rfc2818_verification(host.c_str()), ec); boost::asio::ssl::rfc2818_verification(host.c_str()), ec);
if (ec) if (ec)
return fail("set_verify_callback"); return fail(dstPath, complete, ec, "set_verify_callback");
} }
stream_->async_handshake(ssl::stream_base::client, yield[ec]); stream_->async_handshake(ssl::stream_base::client, yield[ec]);
if (ec) if (ec)
return fail("async_handshake"); return fail(dstPath, complete, ec, "async_handshake");
// Set up an HTTP HEAD request message to find the file size // Set up an HTTP HEAD request message to find the file size
http::request<http::empty_body> req {http::verb::head, target, version}; http::request<http::empty_body> req {http::verb::head, target, version};
@@ -203,7 +193,7 @@ SSLHTTPDownloader::do_session(
http::async_write(*stream_, req, yield[ec]); http::async_write(*stream_, req, yield[ec]);
if(ec) if(ec)
return fail("async_write"); return fail(dstPath, complete, ec, "async_write");
{ {
// Check if available storage for file size // Check if available storage for file size
@@ -211,19 +201,21 @@ SSLHTTPDownloader::do_session(
p.skip(true); p.skip(true);
http::async_read(*stream_, read_buf_, p, yield[ec]); http::async_read(*stream_, read_buf_, p, yield[ec]);
if(ec) if(ec)
return fail("async_read"); return fail(dstPath, complete, ec, "async_read");
if (auto len = p.content_length()) if (auto len = p.content_length())
{ {
try try
{ {
if (*len > space(dstPath.parent_path()).available) if (*len > space(dstPath.parent_path()).available)
return fail("Insufficient disk space for download"); {
return fail(dstPath, complete, ec,
"Insufficient disk space for download");
}
} }
catch (std::exception const& e) catch (std::exception const& e)
{ {
JLOG(j_.error()) << return fail(dstPath, complete, ec,
"exception: " << e.what(); std::string("exception: ") + e.what());
return fail({});
} }
} }
} }
@@ -232,7 +224,7 @@ SSLHTTPDownloader::do_session(
req.method(http::verb::get); req.method(http::verb::get);
http::async_write(*stream_, req, yield[ec]); http::async_write(*stream_, req, yield[ec]);
if(ec) if(ec)
return fail("async_write"); return fail(dstPath, complete, ec, "async_write");
// Download the file // Download the file
http::response_parser<http::file_body> p; http::response_parser<http::file_body> p;
@@ -244,14 +236,14 @@ SSLHTTPDownloader::do_session(
if (ec) if (ec)
{ {
p.get().body().close(); p.get().body().close();
return fail("open"); return fail(dstPath, complete, ec, "open");
} }
http::async_read(*stream_, read_buf_, p, yield[ec]); http::async_read(*stream_, read_buf_, p, yield[ec]);
if (ec) if (ec)
{ {
p.get().body().close(); p.get().body().close();
return fail("async_read"); return fail(dstPath, complete, ec, "async_read");
} }
p.get().body().close(); p.get().body().close();
@@ -266,6 +258,7 @@ SSLHTTPDownloader::do_session(
JLOG(j_.trace()) << JLOG(j_.trace()) <<
"async_shutdown: " << ec.message(); "async_shutdown: " << ec.message();
} }
// The socket cannot be reused
stream_ = boost::none; stream_ = boost::none;
JLOG(j_.trace()) << JLOG(j_.trace()) <<
@@ -275,4 +268,36 @@ SSLHTTPDownloader::do_session(
complete(std::move(dstPath)); complete(std::move(dstPath));
} }
void
SSLHTTPDownloader::fail(
boost::filesystem::path dstPath,
std::function<void(boost::filesystem::path)> const& complete,
boost::system::error_code const& ec,
std::string const& errMsg)
{
if (!ec)
{
JLOG(j_.error()) <<
errMsg;
}
else if (ec != boost::asio::error::operation_aborted)
{
JLOG(j_.error()) <<
errMsg << ": " << ec.message();
}
try
{
remove(dstPath);
}
catch (std::exception const& e)
{
JLOG(j_.error()) <<
"exception: " << e.what();
}
complete(std::move(dstPath));
}
}// ripple }// ripple

View File

@@ -99,11 +99,11 @@ public:
/** Get shard indexes being imported /** Get shard indexes being imported
@return The number of shards prepared for import @return a string representing the shards prepared for import
*/ */
virtual virtual
std::uint32_t std::string
getNumPreShard() = 0; getPreShards() = 0;
/** Import a shard into the shard database /** Import a shard into the shard database

View File

@@ -63,7 +63,7 @@ Database::~Database()
void void
Database::waitReads() Database::waitReads()
{ {
std::unique_lock<std::mutex> l(readLock_); std::unique_lock<std::mutex> lock(readLock_);
// Wake in two generations. // Wake in two generations.
// Each generation is a full pass over the space. // Each generation is a full pass over the space.
// If we're in generation N and you issue a request, // If we're in generation N and you issue a request,
@@ -75,7 +75,7 @@ Database::waitReads()
// you know the request is done. // you know the request is done.
std::uint64_t const wakeGen = readGen_ + 2; std::uint64_t const wakeGen = readGen_ + 2;
while (! readShut_ && ! read_.empty() && (readGen_ < wakeGen)) while (! readShut_ && ! read_.empty() && (readGen_ < wakeGen))
readGenCondVar_.wait(l); readGenCondVar_.wait(lock);
} }
void void
@@ -91,7 +91,7 @@ void
Database::stopThreads() Database::stopThreads()
{ {
{ {
std::lock_guard <std::mutex> l(readLock_); std::lock_guard <std::mutex> lock(readLock_);
if (readShut_) // Only stop threads once. if (readShut_) // Only stop threads once.
return; return;
@@ -110,7 +110,7 @@ Database::asyncFetch(uint256 const& hash, std::uint32_t seq,
std::shared_ptr<KeyCache<uint256>> const& nCache) std::shared_ptr<KeyCache<uint256>> const& nCache)
{ {
// Post a read // Post a read
std::lock_guard <std::mutex> l(readLock_); std::lock_guard <std::mutex> lock(readLock_);
if (read_.emplace(hash, std::make_tuple(seq, pCache, nCache)).second) if (read_.emplace(hash, std::make_tuple(seq, pCache, nCache)).second)
readCondVar_.notify_one(); readCondVar_.notify_one();
} }
@@ -358,12 +358,12 @@ Database::threadEntry()
std::shared_ptr<TaggedCache<uint256, NodeObject>> lastPcache; std::shared_ptr<TaggedCache<uint256, NodeObject>> lastPcache;
std::shared_ptr<KeyCache<uint256>> lastNcache; std::shared_ptr<KeyCache<uint256>> lastNcache;
{ {
std::unique_lock<std::mutex> l(readLock_); std::unique_lock<std::mutex> lock(readLock_);
while (! readShut_ && read_.empty()) while (! readShut_ && read_.empty())
{ {
// All work is done // All work is done
readGenCondVar_.notify_all(); readGenCondVar_.notify_all();
readCondVar_.wait(l); readCondVar_.wait(lock);
} }
if (readShut_) if (readShut_)
break; break;

View File

@@ -64,7 +64,7 @@ bool
DatabaseShardImp::init() DatabaseShardImp::init()
{ {
using namespace boost::filesystem; using namespace boost::filesystem;
std::lock_guard<std::mutex> l(m_); std::lock_guard<std::mutex> lock(m_);
if (init_) if (init_)
{ {
assert(false); assert(false);
@@ -173,7 +173,7 @@ DatabaseShardImp::init()
std::max<std::uint64_t>(1, maxDiskSpace_ / avgShardSz_)); std::max<std::uint64_t>(1, maxDiskSpace_ / avgShardSz_));
} }
else else
updateStats(l); updateStats(lock);
init_ = true; init_ = true;
return true; return true;
} }
@@ -181,7 +181,7 @@ DatabaseShardImp::init()
boost::optional<std::uint32_t> boost::optional<std::uint32_t>
DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq) DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq)
{ {
std::lock_guard<std::mutex> l(m_); std::lock_guard<std::mutex> lock(m_);
assert(init_); assert(init_);
if (incomplete_) if (incomplete_)
return incomplete_->prepare(); return incomplete_->prepare();
@@ -206,7 +206,7 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq)
} }
} }
auto const shardIndex {findShardIndexToAdd(validLedgerSeq, l)}; auto const shardIndex {findShardIndexToAdd(validLedgerSeq, lock)};
if (!shardIndex) if (!shardIndex)
{ {
JLOG(j_.debug()) << JLOG(j_.debug()) <<
@@ -232,7 +232,7 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq)
bool bool
DatabaseShardImp::prepareShard(std::uint32_t shardIndex) DatabaseShardImp::prepareShard(std::uint32_t shardIndex)
{ {
std::lock_guard<std::mutex> l(m_); std::lock_guard<std::mutex> lock(m_);
assert(init_); assert(init_);
if (!canAdd_) if (!canAdd_)
{ {
@@ -316,18 +316,25 @@ DatabaseShardImp::prepareShard(std::uint32_t shardIndex)
void void
DatabaseShardImp::removePreShard(std::uint32_t shardIndex) DatabaseShardImp::removePreShard(std::uint32_t shardIndex)
{ {
std::lock_guard<std::mutex> l(m_); std::lock_guard<std::mutex> lock(m_);
assert(init_); assert(init_);
preShards_.erase(shardIndex); preShards_.erase(shardIndex);
} }
std::uint32_t std::string
DatabaseShardImp::getNumPreShard() DatabaseShardImp::getPreShards()
{ {
std::lock_guard<std::mutex> l(m_); RangeSet<std::uint32_t> rs;
assert(init_); {
return preShards_.size(); std::lock_guard<std::mutex> lock(m_);
} assert(init_);
if (preShards_.empty())
return {};
for (auto const& ps : preShards_)
rs.insert(ps.first);
}
return to_string(rs);
};
bool bool
DatabaseShardImp::importShard(std::uint32_t shardIndex, DatabaseShardImp::importShard(std::uint32_t shardIndex,
@@ -365,7 +372,7 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex,
return true; return true;
}; };
std::unique_lock<std::mutex> l(m_); std::unique_lock<std::mutex> lock(m_);
assert(init_); assert(init_);
// Check shard is prepared // Check shard is prepared
@@ -414,9 +421,9 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex,
// Shard validation requires releasing the lock // Shard validation requires releasing the lock
// so the database can fetch data from it // so the database can fetch data from it
it->second = shard.get(); it->second = shard.get();
l.unlock(); lock.unlock();
auto const valid {shard->validate(app_)}; auto const valid {shard->validate(app_)};
l.lock(); lock.lock();
if (!valid) if (!valid)
{ {
it = preShards_.find(shardIndex); it = preShards_.find(shardIndex);
@@ -491,7 +498,7 @@ DatabaseShardImp::setStored(std::shared_ptr<Ledger const> const& ledger)
return; return;
} }
auto const shardIndex {seqToShardIndex(ledger->info().seq)}; auto const shardIndex {seqToShardIndex(ledger->info().seq)};
std::lock_guard<std::mutex> l(m_); std::lock_guard<std::mutex> lock(m_);
assert(init_); assert(init_);
if (!incomplete_ || shardIndex != incomplete_->index()) if (!incomplete_ || shardIndex != incomplete_->index())
{ {
@@ -514,7 +521,7 @@ DatabaseShardImp::setStored(std::shared_ptr<Ledger const> const& ledger)
{ {
complete_.emplace(incomplete_->index(), std::move(incomplete_)); complete_.emplace(incomplete_->index(), std::move(incomplete_));
incomplete_.reset(); incomplete_.reset();
updateStats(l); updateStats(lock);
// Update peers with new shard index // Update peers with new shard index
protocol::TMShardInfo message; protocol::TMShardInfo message;
@@ -530,7 +537,7 @@ bool
DatabaseShardImp::contains(std::uint32_t seq) DatabaseShardImp::contains(std::uint32_t seq)
{ {
auto const shardIndex {seqToShardIndex(seq)}; auto const shardIndex {seqToShardIndex(seq)};
std::lock_guard<std::mutex> l(m_); std::lock_guard<std::mutex> lock(m_);
assert(init_); assert(init_);
if (complete_.find(shardIndex) != complete_.end()) if (complete_.find(shardIndex) != complete_.end())
return true; return true;
@@ -542,7 +549,7 @@ DatabaseShardImp::contains(std::uint32_t seq)
std::string std::string
DatabaseShardImp::getCompleteShards() DatabaseShardImp::getCompleteShards()
{ {
std::lock_guard<std::mutex> l(m_); std::lock_guard<std::mutex> lock(m_);
assert(init_); assert(init_);
return status_; return status_;
} }
@@ -551,7 +558,7 @@ void
DatabaseShardImp::validate() DatabaseShardImp::validate()
{ {
{ {
std::lock_guard<std::mutex> l(m_); std::lock_guard<std::mutex> lock(m_);
assert(init_); assert(init_);
if (complete_.empty() && !incomplete_) if (complete_.empty() && !incomplete_)
{ {
@@ -586,7 +593,7 @@ DatabaseShardImp::validate()
void void
DatabaseShardImp::import(Database& source) DatabaseShardImp::import(Database& source)
{ {
std::unique_lock<std::mutex> l(m_); std::unique_lock<std::mutex> lock(m_);
assert(init_); assert(init_);
// Only the application local node store can be imported // Only the application local node store can be imported
@@ -772,7 +779,7 @@ DatabaseShardImp::import(Database& source)
complete_.clear(); complete_.clear();
incomplete_.reset(); incomplete_.reset();
usedDiskSpace_ = 0; usedDiskSpace_ = 0;
l.unlock(); lock.unlock();
if (!init()) if (!init())
Throw<std::runtime_error>("Failed to initialize"); Throw<std::runtime_error>("Failed to initialize");
@@ -783,7 +790,7 @@ DatabaseShardImp::getWriteLoad() const
{ {
std::int32_t wl {0}; std::int32_t wl {0};
{ {
std::lock_guard<std::mutex> l(m_); std::lock_guard<std::mutex> lock(m_);
assert(init_); assert(init_);
for (auto const& c : complete_) for (auto const& c : complete_)
wl += c.second->getBackend()->getWriteLoad(); wl += c.second->getBackend()->getWriteLoad();
@@ -803,7 +810,7 @@ DatabaseShardImp::store(NodeObjectType type,
std::shared_ptr<NodeObject> nObj; std::shared_ptr<NodeObject> nObj;
auto const shardIndex {seqToShardIndex(seq)}; auto const shardIndex {seqToShardIndex(seq)};
{ {
std::lock_guard<std::mutex> l(m_); std::lock_guard<std::mutex> lock(m_);
assert(init_); assert(init_);
if (!incomplete_ || shardIndex != incomplete_->index()) if (!incomplete_ || shardIndex != incomplete_->index())
{ {
@@ -851,7 +858,7 @@ bool
DatabaseShardImp::copyLedger(std::shared_ptr<Ledger const> const& ledger) DatabaseShardImp::copyLedger(std::shared_ptr<Ledger const> const& ledger)
{ {
auto const shardIndex {seqToShardIndex(ledger->info().seq)}; auto const shardIndex {seqToShardIndex(ledger->info().seq)};
std::lock_guard<std::mutex> l(m_); std::lock_guard<std::mutex> lock(m_);
assert(init_); assert(init_);
if (!incomplete_ || shardIndex != incomplete_->index()) if (!incomplete_ || shardIndex != incomplete_->index())
{ {
@@ -881,7 +888,7 @@ DatabaseShardImp::copyLedger(std::shared_ptr<Ledger const> const& ledger)
{ {
complete_.emplace(incomplete_->index(), std::move(incomplete_)); complete_.emplace(incomplete_->index(), std::move(incomplete_));
incomplete_.reset(); incomplete_.reset();
updateStats(l); updateStats(lock);
} }
return true; return true;
} }
@@ -891,7 +898,7 @@ DatabaseShardImp::getDesiredAsyncReadCount(std::uint32_t seq)
{ {
auto const shardIndex {seqToShardIndex(seq)}; auto const shardIndex {seqToShardIndex(seq)};
{ {
std::lock_guard<std::mutex> l(m_); std::lock_guard<std::mutex> lock(m_);
assert(init_); assert(init_);
auto it = complete_.find(shardIndex); auto it = complete_.find(shardIndex);
if (it != complete_.end()) if (it != complete_.end())
@@ -907,7 +914,7 @@ DatabaseShardImp::getCacheHitRate()
{ {
float sz, f {0}; float sz, f {0};
{ {
std::lock_guard<std::mutex> l(m_); std::lock_guard<std::mutex> lock(m_);
assert(init_); assert(init_);
sz = complete_.size(); sz = complete_.size();
for (auto const& c : complete_) for (auto const& c : complete_)
@@ -924,11 +931,11 @@ DatabaseShardImp::getCacheHitRate()
void void
DatabaseShardImp::tune(int size, std::chrono::seconds age) DatabaseShardImp::tune(int size, std::chrono::seconds age)
{ {
std::lock_guard<std::mutex> l(m_); std::lock_guard<std::mutex> lock(m_);
assert(init_); assert(init_);
cacheSz_ = size; cacheSz_ = size;
cacheAge_ = age; cacheAge_ = age;
int const sz {calcTargetCacheSz(l)}; int const sz {calcTargetCacheSz(lock)};
for (auto const& c : complete_) for (auto const& c : complete_)
{ {
c.second->pCache()->setTargetSize(sz); c.second->pCache()->setTargetSize(sz);
@@ -948,9 +955,9 @@ DatabaseShardImp::tune(int size, std::chrono::seconds age)
void void
DatabaseShardImp::sweep() DatabaseShardImp::sweep()
{ {
std::lock_guard<std::mutex> l(m_); std::lock_guard<std::mutex> lock(m_);
assert(init_); assert(init_);
int const sz {calcTargetCacheSz(l)}; int const sz {calcTargetCacheSz(lock)};
for (auto const& c : complete_) for (auto const& c : complete_)
{ {
c.second->pCache()->sweep(); c.second->pCache()->sweep();
@@ -971,19 +978,19 @@ std::shared_ptr<NodeObject>
DatabaseShardImp::fetchFrom(uint256 const& hash, std::uint32_t seq) DatabaseShardImp::fetchFrom(uint256 const& hash, std::uint32_t seq)
{ {
auto const shardIndex {seqToShardIndex(seq)}; auto const shardIndex {seqToShardIndex(seq)};
std::unique_lock<std::mutex> l(m_); std::unique_lock<std::mutex> lock(m_);
assert(init_); assert(init_);
{ {
auto it = complete_.find(shardIndex); auto it = complete_.find(shardIndex);
if (it != complete_.end()) if (it != complete_.end())
{ {
l.unlock(); lock.unlock();
return fetchInternal(hash, *it->second->getBackend()); return fetchInternal(hash, *it->second->getBackend());
} }
} }
if (incomplete_ && incomplete_->index() == shardIndex) if (incomplete_ && incomplete_->index() == shardIndex)
{ {
l.unlock(); lock.unlock();
return fetchInternal(hash, *incomplete_->getBackend()); return fetchInternal(hash, *incomplete_->getBackend());
} }
@@ -991,7 +998,7 @@ DatabaseShardImp::fetchFrom(uint256 const& hash, std::uint32_t seq)
auto it = preShards_.find(shardIndex); auto it = preShards_.find(shardIndex);
if (it != preShards_.end() && it->second) if (it != preShards_.end() && it->second)
{ {
l.unlock(); lock.unlock();
return fetchInternal(hash, *it->second->getBackend()); return fetchInternal(hash, *it->second->getBackend());
} }
return {}; return {};
@@ -1109,7 +1116,7 @@ std::pair<std::shared_ptr<PCache>, std::shared_ptr<NCache>>
DatabaseShardImp::selectCache(std::uint32_t seq) DatabaseShardImp::selectCache(std::uint32_t seq)
{ {
auto const shardIndex {seqToShardIndex(seq)}; auto const shardIndex {seqToShardIndex(seq)};
std::lock_guard<std::mutex> l(m_); std::lock_guard<std::mutex> lock(m_);
assert(init_); assert(init_);
{ {
auto it = complete_.find(shardIndex); auto it = complete_.find(shardIndex);

View File

@@ -51,8 +51,8 @@ public:
void void
removePreShard(std::uint32_t shardIndex) override; removePreShard(std::uint32_t shardIndex) override;
std::uint32_t std::string
getNumPreShard() override; getPreShards() override;
bool bool
importShard(std::uint32_t shardIndex, importShard(std::uint32_t shardIndex,

View File

@@ -359,10 +359,8 @@ PeerImp::hasLedger (uint256 const& hash, std::uint32_t seq) const
return true; return true;
} }
if (seq >= app_.getNodeStore().earliestSeq()) return seq >= app_.getNodeStore().earliestSeq() &&
return hasShard( hasShard(NodeStore::seqToShardIndex(seq));
(seq - 1) / NodeStore::DatabaseShard::ledgersPerShardDefault);
return false;
} }
void void

View File

@@ -46,48 +46,44 @@ public:
~ShardArchiveHandler(); ~ShardArchiveHandler();
/** Initializes the handler. /** Add an archive to be downloaded and imported.
@return `true` if successfully initialized.
*/
bool
init();
/** Queue an archive to be downloaded and imported.
@param shardIndex the index of the shard to be imported. @param shardIndex the index of the shard to be imported.
@param url the location of the archive. @param url the location of the archive.
@return `true` if successfully added. @return `true` if successfully added.
@note Returns false if called while downloading.
*/ */
bool bool
add(std::uint32_t shardIndex, parsedURL&& url); add(std::uint32_t shardIndex, parsedURL&& url);
/** Starts downloading and importing of queued archives. */ /** Starts downloading and importing archives. */
void bool
next(); start();
/** Returns indexes of queued archives.
@return indexes of queued archives.
*/
std::string
toString() const;
private: private:
// The callback used by the downloader to notify completion of a download. // Begins the download and import of the next archive.
bool
next(std::lock_guard<std::mutex>& l);
// Callback used by the downloader to notify completion of a download.
void void
complete(boost::filesystem::path dstPath); complete(boost::filesystem::path dstPath);
// A job to extract an archive and import a shard. // Extract a downloaded archive and import it into the shard store.
void void
process(boost::filesystem::path const& dstPath); process(boost::filesystem::path const& dstPath);
// Remove the archive being processed.
void void
remove(std::uint32_t shardIndex); remove(std::lock_guard<std::mutex>&);
std::mutex mutable m_;
Application& app_; Application& app_;
std::shared_ptr<SSLHTTPDownloader> downloader_; std::shared_ptr<SSLHTTPDownloader> downloader_;
std::map<std::uint32_t, parsedURL> archives_;
bool const validate_;
boost::filesystem::path const downloadDir_; boost::filesystem::path const downloadDir_;
bool const validate_;
boost::asio::basic_waitable_timer<std::chrono::steady_clock> timer_; boost::asio::basic_waitable_timer<std::chrono::steady_clock> timer_;
bool process_;
std::map<std::uint32_t, parsedURL> archives_;
beast::Journal j_; beast::Journal j_;
}; };

View File

@@ -57,9 +57,15 @@ doDownloadShard(RPC::Context& context)
if (!shardStore) if (!shardStore)
return rpcError(rpcNOT_ENABLED); return rpcError(rpcNOT_ENABLED);
// Deny request if already downloading // Return status update if already downloading
if (shardStore->getNumPreShard()) auto preShards {shardStore->getPreShards()};
return rpcError(rpcTOO_BUSY); if (!preShards.empty())
{
std::string s {"Download in progress. Shard"};
if (!std::all_of(preShards.begin(), preShards.end(), ::isdigit))
s += "s";
return RPC::makeObjectValue(s + " " + preShards);
}
if (!context.params.isMember(jss::shards)) if (!context.params.isMember(jss::shards))
return RPC::missing_field_error(jss::shards); return RPC::missing_field_error(jss::shards);
@@ -132,8 +138,6 @@ doDownloadShard(RPC::Context& context)
// Begin downloading. The handler keeps itself alive while downloading. // Begin downloading. The handler keeps itself alive while downloading.
auto handler { auto handler {
std::make_shared<RPC::ShardArchiveHandler>(context.app, validate)}; std::make_shared<RPC::ShardArchiveHandler>(context.app, validate)};
if (!handler->init())
return rpcError(rpcINTERNAL);
for (auto& ar : archives) for (auto& ar : archives)
{ {
if (!handler->add(ar.first, std::move(ar.second))) if (!handler->add(ar.first, std::move(ar.second)))
@@ -143,9 +147,14 @@ doDownloadShard(RPC::Context& context)
std::to_string(ar.first) + " exists or being acquired"); std::to_string(ar.first) + " exists or being acquired");
} }
} }
handler->next(); if (!handler->start())
return rpcError(rpcINTERNAL);
return RPC::makeObjectValue("downloading shards " + handler->toString()); std::string s {"Downloading shard"};
preShards = shardStore->getPreShards();
if (!std::all_of(preShards.begin(), preShards.end(), ::isdigit))
s += "s";
return RPC::makeObjectValue(s + " " + preShards);
} }
} // ripple } // ripple

View File

@@ -152,7 +152,7 @@ bool
ServerHandlerImp::onAccept (Session& session, ServerHandlerImp::onAccept (Session& session,
boost::asio::ip::tcp::endpoint endpoint) boost::asio::ip::tcp::endpoint endpoint)
{ {
std::lock_guard<std::mutex> l(countlock_); std::lock_guard<std::mutex> lock(countlock_);
auto const c = ++count_[session.port()]; auto const c = ++count_[session.port()];
@@ -363,7 +363,7 @@ void
ServerHandlerImp::onClose (Session& session, ServerHandlerImp::onClose (Session& session,
boost::system::error_code const&) boost::system::error_code const&)
{ {
std::lock_guard<std::mutex> l(countlock_); std::lock_guard<std::mutex> lock(countlock_);
--count_[session.port()]; --count_[session.port()];
} }

View File

@@ -33,10 +33,11 @@ using namespace std::chrono_literals;
ShardArchiveHandler::ShardArchiveHandler(Application& app, bool validate) ShardArchiveHandler::ShardArchiveHandler(Application& app, bool validate)
: app_(app) : app_(app)
, validate_(validate)
, downloadDir_(get(app_.config().section( , downloadDir_(get(app_.config().section(
ConfigSection::shardDatabase()), "path", "") + "/download") ConfigSection::shardDatabase()), "path", "") + "/download")
, validate_(validate)
, timer_(app_.getIOService()) , timer_(app_.getIOService())
, process_(false)
, j_(app.journal("ShardArchiveHandler")) , j_(app.journal("ShardArchiveHandler"))
{ {
assert(app_.getShardStore()); assert(app_.getShardStore());
@@ -44,9 +45,11 @@ ShardArchiveHandler::ShardArchiveHandler(Application& app, bool validate)
ShardArchiveHandler::~ShardArchiveHandler() ShardArchiveHandler::~ShardArchiveHandler()
{ {
std::lock_guard<std::mutex> lock(m_);
timer_.cancel(); timer_.cancel();
for (auto const& ar : archives_) for (auto const& ar : archives_)
app_.getShardStore()->removePreShard(ar.first); app_.getShardStore()->removePreShard(ar.first);
archives_.clear();
// Remove temp root download directory // Remove temp root download directory
try try
@@ -61,18 +64,45 @@ ShardArchiveHandler::~ShardArchiveHandler()
} }
bool bool
ShardArchiveHandler::init() ShardArchiveHandler::add(std::uint32_t shardIndex, parsedURL&& url)
{ {
std::lock_guard<std::mutex> lock(m_);
if (process_)
{
JLOG(j_.error()) <<
"Download and import already in progress";
return false;
}
auto const it {archives_.find(shardIndex)};
if (it != archives_.end())
return url == it->second;
if (!app_.getShardStore()->prepareShard(shardIndex))
return false;
archives_.emplace(shardIndex, std::move(url));
return true;
}
bool
ShardArchiveHandler::start()
{
std::lock_guard<std::mutex> lock(m_);
if (!app_.getShardStore()) if (!app_.getShardStore())
{ {
JLOG(j_.error()) << JLOG(j_.error()) <<
"No shard store available"; "No shard store available";
return false; return false;
} }
if (downloader_) if (process_)
{ {
JLOG(j_.error()) << JLOG(j_.warn()) <<
"Already initialized"; "Archives already being processed";
return false;
}
if (archives_.empty())
{
JLOG(j_.warn()) <<
"No archives to process";
return false; return false;
} }
@@ -91,36 +121,31 @@ ShardArchiveHandler::init()
return false; return false;
} }
downloader_ = std::make_shared<SSLHTTPDownloader>( if (!downloader_)
app_.getIOService(), j_); {
return downloader_->init(app_.config()); downloader_ = std::make_shared<SSLHTTPDownloader>(
app_.getIOService(), j_);
if (!downloader_->init(app_.config()))
{
downloader_.reset();
return false;
}
}
return next(lock);
} }
bool bool
ShardArchiveHandler::add(std::uint32_t shardIndex, parsedURL&& url) ShardArchiveHandler::next(std::lock_guard<std::mutex>& l)
{ {
assert(downloader_);
auto const it {archives_.find(shardIndex)};
if (it != archives_.end())
return url == it->second;
if (!app_.getShardStore()->prepareShard(shardIndex))
return false;
archives_.emplace(shardIndex, std::move(url));
return true;
}
void
ShardArchiveHandler::next()
{
assert(downloader_);
// Check if all archives completed
if (archives_.empty()) if (archives_.empty())
return; {
process_ = false;
return false;
}
// Create a temp archive directory at the root // Create a temp archive directory at the root
auto const dstDir { auto const shardIndex {archives_.begin()->first};
downloadDir_ / std::to_string(archives_.begin()->first)}; auto const dstDir {downloadDir_ / std::to_string(shardIndex)};
try try
{ {
create_directory(dstDir); create_directory(dstDir);
@@ -129,53 +154,55 @@ ShardArchiveHandler::next()
{ {
JLOG(j_.error()) << JLOG(j_.error()) <<
"exception: " << e.what(); "exception: " << e.what();
remove(archives_.begin()->first); remove(l);
return next(); return next(l);
} }
// Download the archive // Download the archive
auto const& url {archives_.begin()->second}; auto const& url {archives_.begin()->second};
downloader_->download( if (!downloader_->download(
url.domain, url.domain,
std::to_string(url.port.get_value_or(443)), std::to_string(url.port.get_value_or(443)),
url.path, url.path,
11, 11,
dstDir / "archive.tar.lz4", dstDir / "archive.tar.lz4",
std::bind(&ShardArchiveHandler::complete, std::bind(&ShardArchiveHandler::complete,
shared_from_this(), std::placeholders::_1)); shared_from_this(), std::placeholders::_1)))
} {
remove(l);
return next(l);
}
std::string process_ = true;
ShardArchiveHandler::toString() const return true;
{ }
assert(downloader_);
RangeSet<std::uint32_t> rs;
for (auto const& ar : archives_)
rs.insert(ar.first);
return to_string(rs);
};
void void
ShardArchiveHandler::complete(path dstPath) ShardArchiveHandler::complete(path dstPath)
{ {
try
{ {
if (!is_regular_file(dstPath)) std::lock_guard<std::mutex> lock(m_);
try
{ {
auto ar {archives_.begin()}; if (!is_regular_file(dstPath))
JLOG(j_.error()) << {
"Downloading shard id " << ar->first << auto ar {archives_.begin()};
" URL " << ar->second.domain << ar->second.path; JLOG(j_.error()) <<
remove(ar->first); "Downloading shard id " << ar->first <<
return next(); " form URL " << ar->second.domain << ar->second.path;
remove(lock);
next(lock);
return;
}
}
catch (std::exception const& e)
{
JLOG(j_.error()) <<
"exception: " << e.what();
remove(lock);
next(lock);
return;
} }
}
catch (std::exception const& e)
{
JLOG(j_.error()) <<
"exception: " << e.what();
remove(archives_.begin()->first);
return next();
} }
// Process in another thread to not hold up the IO service // Process in another thread to not hold up the IO service
@@ -187,8 +214,9 @@ ShardArchiveHandler::complete(path dstPath)
auto const mode {ptr->app_.getOPs().getOperatingMode()}; auto const mode {ptr->app_.getOPs().getOperatingMode()};
if (ptr->validate_ && mode != NetworkOPs::omFULL) if (ptr->validate_ && mode != NetworkOPs::omFULL)
{ {
timer_.expires_from_now(std::chrono::seconds{ std::lock_guard<std::mutex> lock(m_);
(NetworkOPs::omFULL - mode) * 10}); timer_.expires_from_now(static_cast<std::chrono::seconds>(
(NetworkOPs::omFULL - mode) * 10));
timer_.async_wait( timer_.async_wait(
[=, dstPath = std::move(dstPath), ptr = std::move(ptr)] [=, dstPath = std::move(dstPath), ptr = std::move(ptr)]
(boost::system::error_code const& ec) (boost::system::error_code const& ec)
@@ -196,21 +224,30 @@ ShardArchiveHandler::complete(path dstPath)
if (ec != boost::asio::error::operation_aborted) if (ec != boost::asio::error::operation_aborted)
ptr->complete(std::move(dstPath)); ptr->complete(std::move(dstPath));
}); });
return;
} }
ptr->process(dstPath); else
ptr->next(); {
ptr->process(dstPath);
std::lock_guard<std::mutex> lock(m_);
remove(lock);
next(lock);
}
}); });
} }
void void
ShardArchiveHandler::process(path const& dstPath) ShardArchiveHandler::process(path const& dstPath)
{ {
auto const shardIndex {archives_.begin()->first}; std::uint32_t shardIndex;
{
std::lock_guard<std::mutex> lock(m_);
shardIndex = archives_.begin()->first;
}
auto const shardDir {dstPath.parent_path() / std::to_string(shardIndex)}; auto const shardDir {dstPath.parent_path() / std::to_string(shardIndex)};
try try
{ {
// Decompress and extract the downloaded file // Extract the downloaded archive
extractTarLz4(dstPath, dstPath.parent_path()); extractTarLz4(dstPath, dstPath.parent_path());
// The extracted root directory name must match the shard index // The extracted root directory name must match the shard index
@@ -219,14 +256,14 @@ ShardArchiveHandler::process(path const& dstPath)
JLOG(j_.error()) << JLOG(j_.error()) <<
"Shard " << shardIndex << "Shard " << shardIndex <<
" mismatches archive shard directory"; " mismatches archive shard directory";
return remove(shardIndex); return;
} }
} }
catch (std::exception const& e) catch (std::exception const& e)
{ {
JLOG(j_.error()) << JLOG(j_.error()) <<
"exception: " << e.what(); "exception: " << e.what();
return remove(shardIndex); return;
} }
// Import the shard into the shard store // Import the shard into the shard store
@@ -234,18 +271,17 @@ ShardArchiveHandler::process(path const& dstPath)
{ {
JLOG(j_.error()) << JLOG(j_.error()) <<
"Importing shard " << shardIndex; "Importing shard " << shardIndex;
return;
} }
else
{ JLOG(j_.debug()) <<
JLOG(j_.debug()) << "Shard " << shardIndex << " downloaded and imported";
"Shard " << shardIndex << " downloaded and imported";
}
remove(shardIndex);
} }
void void
ShardArchiveHandler::remove(std::uint32_t shardIndex) ShardArchiveHandler::remove(std::lock_guard<std::mutex>&)
{ {
auto const shardIndex {archives_.begin()->first};
app_.getShardStore()->removePreShard(shardIndex); app_.getShardStore()->removePreShard(shardIndex);
archives_.erase(shardIndex); archives_.erase(shardIndex);