Enforce a 20s timeout when making validator list requests (RIPD-1737)

This commit is contained in:
Mike Ellery
2019-04-08 12:09:04 -07:00
committed by Nik Bougalis
parent 2e26377e7c
commit dd99bf479f
4 changed files with 329 additions and 146 deletions

View File

@@ -26,15 +26,15 @@
#include <ripple/basics/Slice.h>
#include <ripple/json/json_reader.h>
#include <ripple/protocol/JsonFields.h>
#include <boost/algorithm/clamp.hpp>
#include <boost/regex.hpp>
#include <algorithm>
namespace ripple {
// default site query frequency - 5 minutes
auto constexpr DEFAULT_REFRESH_INTERVAL = std::chrono::minutes{5};
auto constexpr ERROR_RETRY_INTERVAL = std::chrono::seconds{30};
unsigned short constexpr MAX_REDIRECTS = 3;
auto constexpr default_refresh_interval = std::chrono::minutes{5};
auto constexpr error_retry_interval = std::chrono::seconds{30};
unsigned short constexpr max_redirects = 3;
ValidatorSite::Site::Resource::Resource (std::string uri_)
: uri {std::move(uri_)}
@@ -82,7 +82,7 @@ ValidatorSite::Site::Site (std::string uri)
: loadedResource {std::make_shared<Resource>(std::move(uri))}
, startingResource {loadedResource}
, redirCount {0}
, refreshInterval {DEFAULT_REFRESH_INTERVAL}
, refreshInterval {default_refresh_interval}
, nextRefresh {clock_type::now()}
{
}
@@ -90,7 +90,8 @@ ValidatorSite::Site::Site (std::string uri)
ValidatorSite::ValidatorSite (
boost::asio::io_service& ios,
ValidatorList& validators,
beast::Journal j)
beast::Journal j,
std::chrono::seconds timeout)
: ios_ (ios)
, validators_ (validators)
, j_ (j)
@@ -98,6 +99,7 @@ ValidatorSite::ValidatorSite (
, fetching_ (false)
, pending_ (false)
, stopping_ (false)
, requestTimeout_ (timeout)
{
}
@@ -153,7 +155,7 @@ ValidatorSite::start ()
{
std::lock_guard <std::mutex> lock{state_mutex_};
if (timer_.expires_at() == clock_type::time_point{})
setTimer ();
setTimer (lock);
}
void
@@ -168,20 +170,29 @@ ValidatorSite::stop()
{
std::unique_lock<std::mutex> lock{state_mutex_};
stopping_ = true;
cv_.wait(lock, [&]{ return ! fetching_; });
// work::cancel() must be called before the
// cv wait in order to kick any asio async operations
// that might be pending.
if(auto sp = work_.lock())
sp->cancel();
cv_.wait(lock, [&]{ return ! fetching_; });
error_code ec;
timer_.cancel(ec);
// docs indicate cancel() can throw, but this should be
// reconsidered if it changes to noexcept
try
{
timer_.cancel();
}
catch (boost::system::system_error const&)
{
}
stopping_ = false;
pending_ = false;
cv_.notify_all();
}
void
ValidatorSite::setTimer ()
ValidatorSite::setTimer (std::lock_guard<std::mutex>& state_lock)
{
std::lock_guard <std::mutex> lock{sites_mutex_};
@@ -196,8 +207,11 @@ ValidatorSite::setTimer ()
pending_ = next->nextRefresh <= clock_type::now();
cv_.notify_all();
timer_.expires_at (next->nextRefresh);
timer_.async_wait (std::bind (&ValidatorSite::onTimer, this,
std::distance (sites_.begin (), next), std::placeholders::_1));
auto idx = std::distance (sites_.begin (), next);
timer_.async_wait ([this, idx] (boost::system::error_code const& ec)
{
this->onTimer (idx, ec);
});
}
}
@@ -205,22 +219,42 @@ void
ValidatorSite::makeRequest (
std::shared_ptr<Site::Resource> resource,
std::size_t siteIdx,
std::lock_guard<std::mutex>& lock)
std::lock_guard<std::mutex>& sites_lock)
{
fetching_ = true;
sites_[siteIdx].activeResource = resource;
std::shared_ptr<detail::Work> sp;
auto onFetch =
[this, siteIdx] (error_code const& err, detail::response_type&& resp)
auto timeoutCancel =
[this] ()
{
std::lock_guard <std::mutex> lock_state{state_mutex_};
// docs indicate cancel_one() can throw, but this
// should be reconsidered if it changes to noexcept
try
{
timer_.cancel_one();
}
catch (boost::system::system_error const&)
{
}
};
auto onFetch =
[this, siteIdx, timeoutCancel] (
error_code const& err, detail::response_type&& resp)
{
timeoutCancel ();
onSiteFetch (err, std::move(resp), siteIdx);
};
auto onFetchFile =
[this, siteIdx] (error_code const& err, std::string const& resp)
{
onTextFetch (err, resp, siteIdx);
};
[this, siteIdx, timeoutCancel] (
error_code const& err, std::string const& resp)
{
timeoutCancel ();
onTextFetch (err, resp, siteIdx);
};
JLOG (j_.debug()) << "Starting request for " << resource->uri;
if (resource->pUrl.scheme == "https")
{
@@ -252,6 +286,34 @@ ValidatorSite::makeRequest (
work_ = sp;
sp->run ();
// start a timer for the request, which shouldn't take more
// than requestTimeout_ to complete
std::lock_guard <std::mutex> lock_state{state_mutex_};
timer_.expires_after (requestTimeout_);
timer_.async_wait ([this, siteIdx] (boost::system::error_code const& ec)
{
this->onRequestTimeout (siteIdx, ec);
});
}
void
ValidatorSite::onRequestTimeout (
std::size_t siteIdx,
error_code const& ec)
{
if (ec)
return;
{
std::lock_guard <std::mutex> lock_site{sites_mutex_};
JLOG (j_.warn()) <<
"Request for " << sites_[siteIdx].activeResource->uri <<
" took too long";
}
std::lock_guard<std::mutex> lock_state{state_mutex_};
if(auto sp = work_.lock())
sp->cancel();
}
void
@@ -268,14 +330,12 @@ ValidatorSite::onTimer (
return;
}
std::lock_guard <std::mutex> lock{sites_mutex_};
sites_[siteIdx].nextRefresh =
clock_type::now() + sites_[siteIdx].refreshInterval;
assert(! fetching_);
sites_[siteIdx].redirCount = 0;
try
{
std::lock_guard <std::mutex> lock{sites_mutex_};
sites_[siteIdx].nextRefresh =
clock_type::now() + sites_[siteIdx].refreshInterval;
sites_[siteIdx].redirCount = 0;
// the WorkSSL client can throw if SSL init fails
makeRequest(sites_[siteIdx].startingResource, siteIdx, lock);
}
@@ -292,7 +352,7 @@ void
ValidatorSite::parseJsonResponse (
std::string const& res,
std::size_t siteIdx,
std::lock_guard<std::mutex>& lock)
std::lock_guard<std::mutex>& sites_lock)
{
Json::Reader r;
Json::Value body;
@@ -370,10 +430,15 @@ ValidatorSite::parseJsonResponse (
if (body.isMember ("refresh_interval") &&
body["refresh_interval"].isNumeric ())
{
// TODO: should we sanity check/clamp this value
// to something reasonable?
sites_[siteIdx].refreshInterval =
std::chrono::minutes{body["refresh_interval"].asUInt ()};
using namespace std::chrono_literals;
std::chrono::minutes const refresh =
boost::algorithm::clamp(
std::chrono::minutes {body["refresh_interval"].asUInt ()},
1min,
24h);
sites_[siteIdx].refreshInterval = refresh;
sites_[siteIdx].nextRefresh =
clock_type::now() + sites_[siteIdx].refreshInterval;
}
}
@@ -381,7 +446,7 @@ std::shared_ptr<ValidatorSite::Site::Resource>
ValidatorSite::processRedirect (
detail::response_type& res,
std::size_t siteIdx,
std::lock_guard<std::mutex>& lock)
std::lock_guard<std::mutex>& sites_lock)
{
using namespace boost::beast::http;
std::shared_ptr<Site::Resource> newLocation;
@@ -395,7 +460,7 @@ ValidatorSite::processRedirect (
throw std::runtime_error{"missing location"};
}
if (sites_[siteIdx].redirCount == MAX_REDIRECTS)
if (sites_[siteIdx].redirCount == max_redirects)
{
JLOG (j_.warn()) <<
"Exceeded max redirects for validator list at " <<
@@ -435,6 +500,8 @@ ValidatorSite::onSiteFetch(
{
{
std::lock_guard <std::mutex> lock_sites{sites_mutex_};
JLOG (j_.debug()) << "Got completion for "
<< sites_[siteIdx].activeResource->uri;
auto onError = [&](std::string const& errMsg, bool retry)
{
sites_[siteIdx].lastRefreshStatus.emplace(
@@ -443,7 +510,7 @@ ValidatorSite::onSiteFetch(
errMsg});
if (retry)
sites_[siteIdx].nextRefresh =
clock_type::now() + ERROR_RETRY_INTERVAL;
clock_type::now() + error_retry_interval;
};
if (ec)
{
@@ -506,7 +573,7 @@ ValidatorSite::onSiteFetch(
std::lock_guard <std::mutex> lock_state{state_mutex_};
fetching_ = false;
if (! stopping_)
setTimer ();
setTimer (lock_state);
cv_.notify_all();
}
@@ -547,7 +614,7 @@ ValidatorSite::onTextFetch(
std::lock_guard <std::mutex> lock_state{state_mutex_};
fetching_ = false;
if (! stopping_)
setTimer ();
setTimer (lock_state);
cv_.notify_all();
}