Accept redirects from validator list sites:

Honor location header/redirect from validator sites. Limit retries per
refresh interval to 3. Shorten refresh interval after HTTP/network errors.

Fixes: RIPD-1669
This commit is contained in:
Mike Ellery
2018-10-08 09:59:20 -07:00
parent 3e22a1e9e8
commit bdaad19e70
5 changed files with 497 additions and 235 deletions

View File

@@ -31,6 +31,30 @@ namespace ripple {
// default site query frequency - 5 minutes
auto constexpr DEFAULT_REFRESH_INTERVAL = std::chrono::minutes{5};
auto constexpr ERROR_RETRY_INTERVAL = std::chrono::seconds{30};
unsigned short constexpr MAX_REDIRECTS = 3;
ValidatorSite::Site::Resource::Resource (std::string u)
: uri {std::move(u)}
{
if (! parseUrl (pUrl, uri) ||
(pUrl.scheme != "http" && pUrl.scheme != "https"))
{
throw std::runtime_error {"invalid url"};
}
if (! pUrl.port)
pUrl.port = (pUrl.scheme == "https") ? 443 : 80;
}
ValidatorSite::Site::Site (std::string uri)
: loadedResource {std::make_shared<Resource>(std::move(uri))}
, startingResource {loadedResource}
, redirCount {0}
, refreshInterval {DEFAULT_REFRESH_INTERVAL}
, nextRefresh {clock_type::now()}
{
}
ValidatorSite::ValidatorSite (
boost::asio::io_service& ios,
@@ -74,20 +98,16 @@ ValidatorSite::load (
for (auto uri : siteURIs)
{
parsedURL pUrl;
if (! parseUrl (pUrl, uri) ||
(pUrl.scheme != "http" && pUrl.scheme != "https"))
try
{
sites_.emplace_back (uri);
}
catch (std::exception &)
{
JLOG (j_.error()) <<
"Invalid validator site uri: " << uri;
return false;
}
if (! pUrl.port)
pUrl.port = (pUrl.scheme == "https") ? 443 : 80;
sites_.push_back ({
uri, pUrl, DEFAULT_REFRESH_INTERVAL, clock_type::now()});
}
JLOG (j_.debug()) <<
@@ -149,6 +169,45 @@ ValidatorSite::setTimer ()
}
}
void
ValidatorSite::makeRequest (
Site::ResourcePtr resource,
std::size_t siteIdx,
std::lock_guard<std::mutex>& lock)
{
fetching_ = true;
sites_[siteIdx].activeResource = resource;
std::shared_ptr<detail::Work> sp;
auto onFetch =
[this, siteIdx] (error_code const& err, detail::response_type&& resp)
{
onSiteFetch (err, std::move(resp), siteIdx);
};
if (resource->pUrl.scheme == "https")
{
sp = std::make_shared<detail::WorkSSL>(
resource->pUrl.domain,
resource->pUrl.path,
std::to_string(*resource->pUrl.port),
ios_,
j_,
onFetch);
}
else
{
sp = std::make_shared<detail::WorkPlain>(
resource->pUrl.domain,
resource->pUrl.path,
std::to_string(*resource->pUrl.port),
ios_,
onFetch);
}
work_ = sp;
sp->run ();
}
void
ValidatorSite::onTimer (
std::size_t siteIdx,
@@ -165,40 +224,145 @@ ValidatorSite::onTimer (
std::lock_guard <std::mutex> lock{sites_mutex_};
sites_[siteIdx].nextRefresh =
clock_type::now() + DEFAULT_REFRESH_INTERVAL;
clock_type::now() + sites_[siteIdx].refreshInterval;
assert(! fetching_);
fetching_ = true;
sites_[siteIdx].redirCount = 0;
makeRequest(sites_[siteIdx].startingResource, siteIdx, lock);
}
std::shared_ptr<detail::Work> sp;
if (sites_[siteIdx].pUrl.scheme == "https")
void
ValidatorSite::parseJsonResponse (
detail::response_type& res,
std::size_t siteIdx,
std::lock_guard<std::mutex>& lock)
{
Json::Reader r;
Json::Value body;
if (! r.parse(res.body().data(), body))
{
sp = std::make_shared<detail::WorkSSL>(
sites_[siteIdx].pUrl.domain,
sites_[siteIdx].pUrl.path,
std::to_string(*sites_[siteIdx].pUrl.port),
ios_,
j_,
[this, siteIdx](error_code const& err, detail::response_type&& resp)
{
onSiteFetch (err, std::move(resp), siteIdx);
});
JLOG (j_.warn()) <<
"Unable to parse JSON response from " <<
sites_[siteIdx].activeResource->uri;
throw std::runtime_error{"bad json"};
}
if( ! body.isObject () ||
! body.isMember("blob") || ! body["blob"].isString () ||
! body.isMember("manifest") || ! body["manifest"].isString () ||
! body.isMember("signature") || ! body["signature"].isString() ||
! body.isMember("version") || ! body["version"].isInt())
{
JLOG (j_.warn()) <<
"Missing fields in JSON response from " <<
sites_[siteIdx].activeResource->uri;
throw std::runtime_error{"missing fields"};
}
auto const disp = validators_.applyList (
body["manifest"].asString (),
body["blob"].asString (),
body["signature"].asString(),
body["version"].asUInt());
sites_[siteIdx].lastRefreshStatus.emplace(
Site::Status{clock_type::now(), disp, ""});
if (ListDisposition::accepted == disp)
{
JLOG (j_.debug()) <<
"Applied new validator list from " <<
sites_[siteIdx].activeResource->uri;
}
else if (ListDisposition::same_sequence == disp)
{
JLOG (j_.debug()) <<
"Validator list with current sequence from " <<
sites_[siteIdx].activeResource->uri;
}
else if (ListDisposition::stale == disp)
{
JLOG (j_.warn()) <<
"Stale validator list from " <<
sites_[siteIdx].activeResource->uri;
}
else if (ListDisposition::untrusted == disp)
{
JLOG (j_.warn()) <<
"Untrusted validator list from " <<
sites_[siteIdx].activeResource->uri;
}
else if (ListDisposition::invalid == disp)
{
JLOG (j_.warn()) <<
"Invalid validator list from " <<
sites_[siteIdx].activeResource->uri;
}
else if (ListDisposition::unsupported_version == disp)
{
JLOG (j_.warn()) <<
"Unsupported version validator list from " <<
sites_[siteIdx].activeResource->uri;
}
else
{
sp = std::make_shared<detail::WorkPlain>(
sites_[siteIdx].pUrl.domain,
sites_[siteIdx].pUrl.path,
std::to_string(*sites_[siteIdx].pUrl.port),
ios_,
[this, siteIdx](error_code const& err, detail::response_type&& resp)
{
onSiteFetch (err, std::move(resp), siteIdx);
});
BOOST_ASSERT(false);
}
work_ = sp;
sp->run ();
if (body.isMember ("refresh_interval") &&
body["refresh_interval"].isNumeric ())
{
// TODO: should we sanity check/clamp this value
// to something reasonable?
sites_[siteIdx].refreshInterval =
std::chrono::minutes{body["refresh_interval"].asUInt ()};
}
}
ValidatorSite::Site::ResourcePtr
ValidatorSite::processRedirect (
detail::response_type& res,
std::size_t siteIdx,
std::lock_guard<std::mutex>& lock)
{
using namespace boost::beast::http;
Site::ResourcePtr newLocation;
if (res.find(field::location) == res.end() ||
res[field::location].empty())
{
JLOG (j_.warn()) <<
"Request for validator list at " <<
sites_[siteIdx].activeResource->uri <<
" returned a redirect with no Location.";
throw std::runtime_error{"missing location"};
}
if (sites_[siteIdx].redirCount == MAX_REDIRECTS)
{
JLOG (j_.warn()) <<
"Exceeded max redirects for validator list at " <<
sites_[siteIdx].loadedResource->uri ;
throw std::runtime_error{"max redirects"};
}
JLOG (j_.debug()) <<
"Got redirect for validator list from " <<
sites_[siteIdx].activeResource->uri <<
" to new location " << res[field::location];
try
{
newLocation = std::make_shared<Site::Resource>(
std::string(res[field::location]));
sites_[siteIdx].redirCount++;
}
catch (std::exception &)
{
JLOG (j_.error()) <<
"Invalid redirect location: " << res[field::location];
throw;
}
return newLocation;
}
void
@@ -207,111 +371,77 @@ ValidatorSite::onSiteFetch(
detail::response_type&& res,
std::size_t siteIdx)
{
if (! ec && res.result() != boost::beast::http::status::ok)
Site::ResourcePtr newLocation;
bool shouldRetry = false;
{
std::lock_guard <std::mutex> lock{sites_mutex_};
JLOG (j_.warn()) <<
"Request for validator list at " <<
sites_[siteIdx].uri << " returned " << res.result_int();
sites_[siteIdx].lastRefreshStatus.emplace(
Site::Status{clock_type::now(), ListDisposition::invalid});
}
else if (! ec)
{
std::lock_guard <std::mutex> lock{sites_mutex_};
Json::Reader r;
Json::Value body;
if (r.parse(res.body().data(), body) &&
body.isObject () &&
body.isMember("blob") && body["blob"].isString () &&
body.isMember("manifest") && body["manifest"].isString () &&
body.isMember("signature") && body["signature"].isString() &&
body.isMember("version") && body["version"].isInt())
std::lock_guard <std::mutex> lock_sites{sites_mutex_};
try
{
auto const disp = validators_.applyList (
body["manifest"].asString (),
body["blob"].asString (),
body["signature"].asString(),
body["version"].asUInt());
sites_[siteIdx].lastRefreshStatus.emplace(
Site::Status{clock_type::now(), disp});
if (ListDisposition::accepted == disp)
{
JLOG (j_.debug()) <<
"Applied new validator list from " <<
sites_[siteIdx].uri;
}
else if (ListDisposition::same_sequence == disp)
{
JLOG (j_.debug()) <<
"Validator list with current sequence from " <<
sites_[siteIdx].uri;
}
else if (ListDisposition::stale == disp)
if (ec)
{
JLOG (j_.warn()) <<
"Stale validator list from " << sites_[siteIdx].uri;
}
else if (ListDisposition::untrusted == disp)
{
JLOG (j_.warn()) <<
"Untrusted validator list from " <<
sites_[siteIdx].uri;
}
else if (ListDisposition::invalid == disp)
{
JLOG (j_.warn()) <<
"Invalid validator list from " <<
sites_[siteIdx].uri;
}
else if (ListDisposition::unsupported_version == disp)
{
JLOG (j_.warn()) <<
"Unsupported version validator list from " <<
sites_[siteIdx].uri;
"Problem retrieving from " <<
sites_[siteIdx].activeResource->uri <<
" " <<
ec.value() <<
":" <<
ec.message();
shouldRetry = true;
throw std::runtime_error{"fetch error"};
}
else
{
BOOST_ASSERT(false);
}
using namespace boost::beast::http;
if (res.result() == status::ok)
{
parseJsonResponse(res, siteIdx, lock_sites);
}
else if (res.result() == status::moved_permanently ||
res.result() == status::permanent_redirect ||
res.result() == status::found ||
res.result() == status::temporary_redirect)
{
newLocation = processRedirect (res, siteIdx, lock_sites);
// for perm redirects, also update our starting URI
if (res.result() == status::moved_permanently ||
res.result() == status::permanent_redirect)
{
sites_[siteIdx].startingResource = newLocation;
}
}
else
{
JLOG (j_.warn()) <<
"Request for validator list at " <<
sites_[siteIdx].activeResource->uri <<
" returned bad status: " <<
res.result_int();
shouldRetry = true;
throw std::runtime_error{"bad result code"};
}
if (body.isMember ("refresh_interval") &&
body["refresh_interval"].isNumeric ())
{
sites_[siteIdx].refreshInterval =
std::chrono::minutes{body["refresh_interval"].asUInt ()};
if (newLocation)
{
makeRequest(newLocation, siteIdx, lock_sites);
return; // we are still fetching, so skip
// state update/notify below
}
}
}
else
catch (std::exception& ex)
{
JLOG (j_.warn()) <<
"Unable to parse JSON response from " <<
sites_[siteIdx].uri;
sites_[siteIdx].lastRefreshStatus.emplace(
Site::Status{clock_type::now(), ListDisposition::invalid});
Site::Status{clock_type::now(),
ListDisposition::invalid,
ex.what()});
if (shouldRetry)
sites_[siteIdx].nextRefresh =
clock_type::now() + ERROR_RETRY_INTERVAL;
}
}
else
{
std::lock_guard <std::mutex> lock{sites_mutex_};
sites_[siteIdx].lastRefreshStatus.emplace(
Site::Status{clock_type::now(), ListDisposition::invalid});
JLOG (j_.warn()) <<
"Problem retrieving from " <<
sites_[siteIdx].uri <<
" " <<
ec.value() <<
":" <<
ec.message();
sites_[siteIdx].activeResource.reset();
}
std::lock_guard <std::mutex> lock{state_mutex_};
std::lock_guard <std::mutex> lock_state{state_mutex_};
fetching_ = false;
if (! stopping_)
setTimer ();
@@ -331,15 +461,22 @@ ValidatorSite::getJson() const
for (Site const& site : sites_)
{
Json::Value& v = jSites.append(Json::objectValue);
v[jss::uri] = site.uri;
std::stringstream uri;
uri << site.loadedResource->uri;
if (site.loadedResource != site.startingResource)
uri << " (redirects to " << site.startingResource->uri + ")";
v[jss::uri] = uri.str();
v[jss::next_refresh_time] = to_string(site.nextRefresh);
if (site.lastRefreshStatus)
{
v[jss::last_refresh_time] =
to_string(site.lastRefreshStatus->refreshed);
v[jss::last_refresh_status] =
to_string(site.lastRefreshStatus->disposition);
if (! site.lastRefreshStatus->message.empty())
v[jss::last_refresh_message] =
site.lastRefreshStatus->message;
}
v[jss::refresh_interval_min] =
static_cast<Int>(site.refreshInterval.count());
}