Introduce ShardArchiveHandler improvements:

* Improve documentation
* Make the ShardArchiveHandler rather than the DatabaseShardImp perform
  LastLedgerHash verification for downloaded shards
* Remove ShardArchiveHandler's singleton implementation and make it an
  Application member
* Have the Application invoke ShardArchiveHandler initialization
  instead of clients
* Add RecoveryHandler as a ShardArchiveHandler derived class
* Improve commenting
This commit is contained in:
Devon White
2020-05-15 17:21:16 -04:00
committed by manojsdoshi
parent 21340a1c1e
commit ac766ec0eb
22 changed files with 977 additions and 926 deletions

View File

@@ -22,7 +22,6 @@
#include <ripple/basics/BasicConfig.h>
#include <ripple/core/ConfigSections.h>
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/protocol/ErrorCodes.h>
#include <ripple/rpc/ShardArchiveHandler.h>
#include <ripple/rpc/impl/Handler.h>
@@ -34,9 +33,6 @@ namespace RPC {
using namespace boost::filesystem;
using namespace std::chrono_literals;
std::mutex ShardArchiveHandler::instance_mutex_;
ShardArchiveHandler::pointer ShardArchiveHandler::instance_ = nullptr;
boost::filesystem::path
ShardArchiveHandler::getDownloadDirectory(Config const& config)
{
@@ -48,89 +44,97 @@ ShardArchiveHandler::getDownloadDirectory(Config const& config)
"download";
}
auto
ShardArchiveHandler::getInstance() -> pointer
{
std::lock_guard lock(instance_mutex_);
return instance_;
}
auto
ShardArchiveHandler::getInstance(Application& app, Stoppable& parent) -> pointer
{
std::lock_guard lock(instance_mutex_);
assert(!instance_);
instance_.reset(new ShardArchiveHandler(app, parent));
return instance_;
}
auto
ShardArchiveHandler::recoverInstance(Application& app, Stoppable& parent)
-> pointer
{
std::lock_guard lock(instance_mutex_);
assert(!instance_);
instance_.reset(new ShardArchiveHandler(app, parent, true));
return instance_;
}
bool
ShardArchiveHandler::hasInstance()
{
std::lock_guard lock(instance_mutex_);
return instance_.get() != nullptr;
}
ShardArchiveHandler::ShardArchiveHandler(
std::unique_ptr<ShardArchiveHandler>
ShardArchiveHandler::makeShardArchiveHandler(
Application& app,
Stoppable& parent,
bool recovery)
Stoppable& parent)
{
return std::make_unique<ShardArchiveHandler>(app, parent);
}
std::unique_ptr<ShardArchiveHandler>
ShardArchiveHandler::tryMakeRecoveryHandler(Application& app, Stoppable& parent)
{
auto const downloadDir(getDownloadDirectory(app.config()));
// Create the handler iff the database
// is present.
if (exists(downloadDir / stateDBName) &&
is_regular_file(downloadDir / stateDBName))
{
return std::make_unique<RecoveryHandler>(app, parent);
}
return nullptr;
}
ShardArchiveHandler::ShardArchiveHandler(Application& app, Stoppable& parent)
: Stoppable("ShardArchiveHandler", parent)
, process_(false)
, app_(app)
, j_(app.journal("ShardArchiveHandler"))
, downloadDir_(getDownloadDirectory(app.config()))
, timer_(app_.getIOService())
, process_(false)
, verificationScheduler_(
std::chrono::seconds(get<std::uint32_t>(
app.config().section(ConfigSection::shardDatabase()),
"shard_verification_retry_interval")),
get<std::uint32_t>(
app.config().section(ConfigSection::shardDatabase()),
"shard_verification_max_attempts"))
{
assert(app_.getShardStore());
if (recovery)
downloader_.reset(
new DatabaseDownloader(app_.getIOService(), j_, app_.config()));
}
bool
ShardArchiveHandler::init()
{
try
{
create_directories(downloadDir_);
std::lock_guard lock(m_);
sqliteDB_ = std::make_unique<DatabaseCon>(
downloadDir_,
stateDBName,
DownloaderDBPragma,
ShardArchiveHandlerDBInit);
}
catch (std::exception const& e)
if (process_ || downloader_ != nullptr || sqliteDB_ != nullptr)
{
JLOG(j_.error()) << "exception: " << e.what()
<< " in function: " << __func__;
JLOG(j_.warn()) << "Archives already being processed";
return false;
}
// Initialize from pre-existing database
if (exists(downloadDir_ / stateDBName) &&
is_regular_file(downloadDir_ / stateDBName))
{
downloader_.reset(
new DatabaseDownloader(app_.getIOService(), j_, app_.config()));
return initFromDB(lock);
}
// Fresh initialization
else
{
try
{
create_directories(downloadDir_);
sqliteDB_ = std::make_unique<DatabaseCon>(
downloadDir_,
stateDBName,
DownloaderDBPragma,
ShardArchiveHandlerDBInit);
}
catch (std::exception const& e)
{
JLOG(j_.error())
<< "exception: " << e.what() << " in function: " << __func__;
return false;
}
}
return true;
}
bool
ShardArchiveHandler::initFromDB()
ShardArchiveHandler::initFromDB(std::lock_guard<std::mutex> const& lock)
{
try
{
@@ -151,8 +155,6 @@ ShardArchiveHandler::initFromDB()
soci::rowset<soci::row> rs =
(session.prepare << "SELECT * FROM State;");
std::lock_guard<std::mutex> lock(m_);
for (auto it = rs.begin(); it != rs.end(); ++it)
{
parsedURL url;
@@ -190,14 +192,24 @@ ShardArchiveHandler::initFromDB()
void
ShardArchiveHandler::onStop()
{
std::lock_guard<std::mutex> lock(m_);
if (downloader_)
{
downloader_->onStop();
downloader_.reset();
std::lock_guard<std::mutex> lock(m_);
if (downloader_)
{
downloader_->onStop();
downloader_.reset();
}
timer_.cancel();
}
jobCounter_.join(
"ShardArchiveHandler", std::chrono::milliseconds(2000), j_);
timerCounter_.join(
"ShardArchiveHandler", std::chrono::milliseconds(2000), j_);
stopped();
}
@@ -271,7 +283,7 @@ ShardArchiveHandler::start()
if (!downloader_)
{
// will throw if can't initialize ssl context
downloader_ = std::make_shared<DatabaseDownloader>(
downloader_ = std::make_unique<DatabaseDownloader>(
app_.getIOService(), j_, app_.config());
}
}
@@ -281,6 +293,7 @@ ShardArchiveHandler::start()
return false;
}
process_ = true;
return next(lock);
}
@@ -292,7 +305,7 @@ ShardArchiveHandler::release()
}
bool
ShardArchiveHandler::next(std::lock_guard<std::mutex>& l)
ShardArchiveHandler::next(std::lock_guard<std::mutex> const& l)
{
if (archives_.empty())
{
@@ -300,8 +313,51 @@ ShardArchiveHandler::next(std::lock_guard<std::mutex>& l)
return false;
}
// Create a temp archive directory at the root
if (isStopping())
return false;
auto const shardIndex{archives_.begin()->first};
// We use the sequence of the last validated ledger
// to determine whether or not we have stored a ledger
// that comes after the last ledger in this shard. A
// later ledger must be present in order to reliably
// retrieve the hash of the shard's last ledger.
boost::optional<uint256> expectedHash;
bool shouldHaveHash = false;
if (auto const seq = app_.getShardStore()->lastLedgerSeq(shardIndex);
(shouldHaveHash = app_.getLedgerMaster().getValidLedgerIndex() > seq))
{
expectedHash = app_.getLedgerMaster().walkHashBySeq(seq);
}
if (!expectedHash)
{
auto wrapper =
timerCounter_.wrap([this](boost::system::error_code const& ec) {
if (ec != boost::asio::error::operation_aborted)
{
std::lock_guard lock(m_);
this->next(lock);
}
});
if (!wrapper)
return onClosureFailed(
"failed to wrap closure for last ledger confirmation timer", l);
if (!verificationScheduler_.retry(app_, shouldHaveHash, *wrapper))
{
JLOG(j_.error()) << "failed to find last ledger hash for shard "
<< shardIndex << ", maximum attempts reached";
return removeAndProceed(l);
}
return true;
}
// Create a temp archive directory at the root
auto const dstDir{downloadDir_ / std::to_string(shardIndex)};
try
{
@@ -310,42 +366,42 @@ ShardArchiveHandler::next(std::lock_guard<std::mutex>& l)
catch (std::exception const& e)
{
JLOG(j_.error()) << "exception: " << e.what();
remove(l);
return next(l);
return removeAndProceed(l);
}
// Download the archive. Process in another thread
// to prevent holding up the lock if the downloader
// sleeps.
auto const& url{archives_.begin()->second};
app_.getJobQueue().addJob(
jtCLIENT,
"ShardArchiveHandler",
[this, ptr = shared_from_this(), url, dstDir](Job&) {
if (!downloader_->download(
url.domain,
std::to_string(url.port.get_value_or(443)),
url.path,
11,
dstDir / "archive.tar.lz4",
std::bind(
&ShardArchiveHandler::complete,
ptr,
std::placeholders::_1)))
{
std::lock_guard<std::mutex> l(m_);
remove(l);
next(l);
}
});
auto wrapper = jobCounter_.wrap([this, url, dstDir](Job&) {
if (!downloader_->download(
url.domain,
std::to_string(url.port.get_value_or(443)),
url.path,
11,
dstDir / "archive.tar.lz4",
[this](path dstPath) { complete(dstPath); }))
{
std::lock_guard<std::mutex> l(m_);
removeAndProceed(l);
}
});
if (!wrapper)
return onClosureFailed(
"failed to wrap closure for starting download", l);
app_.getJobQueue().addJob(jtCLIENT, "ShardArchiveHandler", *wrapper);
process_ = true;
return true;
}
void
ShardArchiveHandler::complete(path dstPath)
{
if (isStopping())
return;
{
std::lock_guard lock(m_);
try
@@ -354,51 +410,68 @@ ShardArchiveHandler::complete(path dstPath)
{
auto ar{archives_.begin()};
JLOG(j_.error())
<< "Downloading shard id " << ar->first << " form URL "
<< "Downloading shard id " << ar->first << " from URL "
<< ar->second.domain << ar->second.path;
remove(lock);
next(lock);
removeAndProceed(lock);
return;
}
}
catch (std::exception const& e)
{
JLOG(j_.error()) << "exception: " << e.what();
remove(lock);
next(lock);
removeAndProceed(lock);
return;
}
}
// Process in another thread to not hold up the IO service
app_.getJobQueue().addJob(
jtCLIENT,
"ShardArchiveHandler",
[=, dstPath = std::move(dstPath), ptr = shared_from_this()](Job&) {
// If not synced then defer and retry
auto const mode{ptr->app_.getOPs().getOperatingMode()};
if (mode != OperatingMode::FULL)
{
std::lock_guard lock(m_);
timer_.expires_from_now(static_cast<std::chrono::seconds>(
(static_cast<std::size_t>(OperatingMode::FULL) -
static_cast<std::size_t>(mode)) *
10));
timer_.async_wait(
[=, dstPath = std::move(dstPath), ptr = std::move(ptr)](
boost::system::error_code const& ec) {
if (ec != boost::asio::error::operation_aborted)
ptr->complete(std::move(dstPath));
});
}
auto wrapper = jobCounter_.wrap([=, dstPath = std::move(dstPath)](Job&) {
if (isStopping())
return;
// If not synced then defer and retry
auto const mode{app_.getOPs().getOperatingMode()};
if (mode != OperatingMode::FULL)
{
std::lock_guard lock(m_);
timer_.expires_from_now(static_cast<std::chrono::seconds>(
(static_cast<std::size_t>(OperatingMode::FULL) -
static_cast<std::size_t>(mode)) *
10));
auto wrapper =
timerCounter_.wrap([=, dstPath = std::move(dstPath)](
boost::system::error_code const& ec) {
if (ec != boost::asio::error::operation_aborted)
complete(std::move(dstPath));
});
if (!wrapper)
onClosureFailed(
"failed to wrap closure for operating mode timer", lock);
else
{
ptr->process(dstPath);
std::lock_guard lock(m_);
remove(lock);
next(lock);
}
});
timer_.async_wait(*wrapper);
}
else
{
process(dstPath);
std::lock_guard lock(m_);
removeAndProceed(lock);
}
});
if (!wrapper)
{
if (isStopping())
return;
JLOG(j_.error()) << "failed to wrap closure for process()";
std::lock_guard lock(m_);
removeAndProceed(lock);
}
// Process in another thread to not hold up the IO service
app_.getJobQueue().addJob(jtCLIENT, "ShardArchiveHandler", *wrapper);
}
void
@@ -441,8 +514,10 @@ ShardArchiveHandler::process(path const& dstPath)
}
void
ShardArchiveHandler::remove(std::lock_guard<std::mutex>&)
ShardArchiveHandler::remove(std::lock_guard<std::mutex> const&)
{
verificationScheduler_.reset();
auto const shardIndex{archives_.begin()->first};
app_.getShardStore()->removePreShard(shardIndex);
archives_.erase(shardIndex);
@@ -466,8 +541,6 @@ ShardArchiveHandler::remove(std::lock_guard<std::mutex>&)
void
ShardArchiveHandler::doRelease(std::lock_guard<std::mutex> const&)
{
process_ = false;
timer_.cancel();
for (auto const& ar : archives_)
app_.getShardStore()->removePreShard(ar.first);
@@ -493,6 +566,32 @@ ShardArchiveHandler::doRelease(std::lock_guard<std::mutex> const&)
}
downloader_.reset();
process_ = false;
}
bool
ShardArchiveHandler::onClosureFailed(
std::string const& errorMsg,
std::lock_guard<std::mutex> const& lock)
{
if (isStopping())
return false;
JLOG(j_.error()) << errorMsg;
return removeAndProceed(lock);
}
bool
ShardArchiveHandler::removeAndProceed(std::lock_guard<std::mutex> const& lock)
{
remove(lock);
return next(lock);
}
RecoveryHandler::RecoveryHandler(Application& app, Stoppable& parent)
: ShardArchiveHandler(app, parent)
{
}
} // namespace RPC