diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index ceda0df4b..0269d483c 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -386,6 +386,7 @@ target_sources (rippled PRIVATE src/ripple/app/misc/NegativeUNLVote.cpp src/ripple/app/misc/NetworkOPs.cpp src/ripple/app/misc/SHAMapStoreImp.cpp + src/ripple/app/misc/detail/impl/WorkSSL.cpp src/ripple/app/misc/impl/AccountTxPaging.cpp src/ripple/app/misc/impl/AmendmentTable.cpp src/ripple/app/misc/impl/LoadFeeTrack.cpp @@ -663,6 +664,7 @@ target_sources (rippled PRIVATE src/test/app/DeliverMin_test.cpp src/test/app/DepositAuth_test.cpp src/test/app/Discrepancy_test.cpp + src/test/app/DNS_test.cpp src/test/app/Escrow_test.cpp src/test/app/FeeVote_test.cpp src/test/app/Flow_test.cpp diff --git a/src/ripple/app/misc/ValidatorSite.h b/src/ripple/app/misc/ValidatorSite.h index 9adb88f05..d77cfb087 100644 --- a/src/ripple/app/misc/ValidatorSite.h +++ b/src/ripple/app/misc/ValidatorSite.h @@ -27,10 +27,12 @@ #include #include #include + #include -#include + #include #include +#include namespace ripple { @@ -71,6 +73,7 @@ class ValidatorSite private: using error_code = boost::system::error_code; using clock_type = std::chrono::system_clock; + using endpoint_type = boost::asio::ip::tcp::endpoint; struct Site { @@ -106,7 +109,9 @@ private: unsigned short redirCount; std::chrono::minutes refreshInterval; clock_type::time_point nextRefresh; - boost::optional lastRefreshStatus; + std::optional lastRefreshStatus; + endpoint_type lastRequestEndpoint; + bool lastRequestSuccessful; }; Application& app_; @@ -135,7 +140,7 @@ private: public: ValidatorSite( Application& app, - boost::optional j = boost::none, + std::optional j = std::nullopt, std::chrono::seconds timeout = std::chrono::seconds{20}); ~ValidatorSite(); @@ -206,6 +211,7 @@ private: void onSiteFetch( boost::system::error_code const& ec, + endpoint_type const& endpoint, detail::response_type&& res, std::size_t siteIdx); diff --git a/src/ripple/app/misc/detail/WorkBase.h b/src/ripple/app/misc/detail/WorkBase.h index 01d0c380f..4b2c88f71 100644 --- a/src/ripple/app/misc/detail/WorkBase.h +++ b/src/ripple/app/misc/detail/WorkBase.h @@ -21,13 +21,17 @@ #define RIPPLE_APP_MISC_DETAIL_WORKBASE_H_INCLUDED #include +#include #include + #include #include #include #include #include +#include + namespace ripple { namespace detail { @@ -37,16 +41,16 @@ class WorkBase : public Work { protected: using error_code = boost::system::error_code; + using endpoint_type = boost::asio::ip::tcp::endpoint; public: - using callback_type = - std::function; + using callback_type = std::function< + void(error_code const&, endpoint_type const&, response_type&&)>; protected: using socket_type = boost::asio::ip::tcp::socket; - using endpoint_type = boost::asio::ip::tcp::endpoint; using resolver_type = boost::asio::ip::tcp::resolver; - using query_type = resolver_type::query; + using results_type = boost::asio::ip::tcp::resolver::results_type; using request_type = boost::beast::http::request; @@ -60,7 +64,9 @@ protected: socket_type socket_; request_type req_; response_type res_; - boost::beast::multi_buffer read_buf_; + boost::beast::multi_buffer readBuf_; + endpoint_type lastEndpoint_; + bool lastStatus_; public: WorkBase( @@ -68,6 +74,8 @@ public: std::string const& path, std::string const& port, boost::asio::io_service& ios, + endpoint_type const& lastEndpoint, + bool lastStatus, callback_type cb); ~WorkBase(); @@ -87,7 +95,7 @@ public: fail(error_code const& ec); void - onResolve(error_code const& ec, resolver_type::iterator it); + onResolve(error_code const& ec, results_type results); void onStart(); @@ -111,6 +119,8 @@ WorkBase::WorkBase( std::string const& path, std::string const& port, boost::asio::io_service& ios, + endpoint_type const& lastEndpoint, + bool lastStatus, callback_type cb) : host_(host) , path_(path) @@ -120,6 +130,8 @@ WorkBase::WorkBase( , strand_(ios) , resolver_(ios) , socket_(ios) + , lastEndpoint_{lastEndpoint} + , lastStatus_(lastStatus) { } @@ -128,6 +140,7 @@ WorkBase::~WorkBase() { if (cb_) cb_(make_error_code(boost::system::errc::not_a_socket), + lastEndpoint_, std::move(res_)); close(); } @@ -141,7 +154,8 @@ WorkBase::run() strand_.wrap(std::bind(&WorkBase::run, impl().shared_from_this()))); resolver_.async_resolve( - query_type{host_, port_}, + host_, + port_, strand_.wrap(std::bind( &WorkBase::onResolve, impl().shared_from_this(), @@ -170,20 +184,54 @@ WorkBase::fail(error_code const& ec) { if (cb_) { - cb_(ec, std::move(res_)); + cb_(ec, lastEndpoint_, std::move(res_)); cb_ = nullptr; } } template void -WorkBase::onResolve(error_code const& ec, resolver_type::iterator it) +WorkBase::onResolve(error_code const& ec, results_type results) { if (ec) return fail(ec); + // Use last endpoint if it is successfully connected + // and is in the list, otherwise pick a random endpoint + // from the list (excluding last endpoint). If there is + // only one endpoint and it is the last endpoint then + // use the last endpoint. + lastEndpoint_ = [&]() -> endpoint_type { + int foundIndex = 0; + auto const foundIt = std::find_if( + results.begin(), results.end(), [&](endpoint_type const& e) { + if (e == lastEndpoint_) + return true; + foundIndex++; + return false; + }); + if (foundIt != results.end() && lastStatus_) + return lastEndpoint_; + else if (results.size() == 1) + return *results.begin(); + else if (foundIt == results.end()) + return *std::next(results.begin(), rand_int(results.size() - 1)); + + // lastEndpoint_ is part of the collection + // Pick a random number from the n-1 valid choices, if we use + // this as an index, note the last element will never be chosen + // and the `lastEndpoint_` index may be chosen. So when the + // `lastEndpoint_` index is chosen, that is treated as if the + // last element was chosen. + auto randIndex = + (results.size() > 2) ? rand_int(results.size() - 2) : 0; + if (randIndex == foundIndex) + randIndex = results.size() - 1; + return *std::next(results.begin(), randIndex); + }(); + socket_.async_connect( - *it, + lastEndpoint_, strand_.wrap(std::bind( &Impl::onConnect, impl().shared_from_this(), @@ -218,7 +266,7 @@ WorkBase::onRequest(error_code const& ec) boost::beast::http::async_read( impl().stream(), - read_buf_, + readBuf_, res_, strand_.wrap(std::bind( &WorkBase::onResponse, @@ -235,7 +283,7 @@ WorkBase::onResponse(error_code const& ec) close(); assert(cb_); - cb_(ec, std::move(res_)); + cb_(ec, lastEndpoint_, std::move(res_)); cb_ = nullptr; } diff --git a/src/ripple/app/misc/detail/WorkPlain.h b/src/ripple/app/misc/detail/WorkPlain.h index ab421f32b..e70d327c2 100644 --- a/src/ripple/app/misc/detail/WorkPlain.h +++ b/src/ripple/app/misc/detail/WorkPlain.h @@ -38,6 +38,8 @@ public: std::string const& path, std::string const& port, boost::asio::io_service& ios, + endpoint_type const& lastEndpoint, + bool lastStatus, callback_type cb); ~WorkPlain() = default; @@ -59,8 +61,10 @@ WorkPlain::WorkPlain( std::string const& path, std::string const& port, boost::asio::io_service& ios, + endpoint_type const& lastEndpoint, + bool lastStatus, callback_type cb) - : WorkBase(host, path, port, ios, cb) + : WorkBase(host, path, port, ios, lastEndpoint, lastStatus, cb) { } diff --git a/src/ripple/app/misc/detail/WorkSSL.h b/src/ripple/app/misc/detail/WorkSSL.h index 18214aaa2..c7e3de614 100644 --- a/src/ripple/app/misc/detail/WorkSSL.h +++ b/src/ripple/app/misc/detail/WorkSSL.h @@ -53,6 +53,8 @@ public: boost::asio::io_service& ios, beast::Journal j, Config const& config, + endpoint_type const& lastEndpoint, + bool lastStatus, callback_type cb); ~WorkSSL() = default; @@ -70,48 +72,6 @@ private: onHandshake(error_code const& ec); }; -//------------------------------------------------------------------------------ - -WorkSSL::WorkSSL( - std::string const& host, - std::string const& path, - std::string const& port, - boost::asio::io_service& ios, - beast::Journal j, - Config const& config, - callback_type cb) - : WorkBase(host, path, port, ios, cb) - , context_(config, j, boost::asio::ssl::context::tlsv12_client) - , stream_(socket_, context_.context()) -{ - auto ec = context_.preConnectVerify(stream_, host_); - if (ec) - Throw( - boost::str(boost::format("preConnectVerify: %s") % ec.message())); -} - -void -WorkSSL::onConnect(error_code const& ec) -{ - auto err = ec ? ec : context_.postConnectVerify(stream_, host_); - if (err) - return fail(err); - - stream_.async_handshake( - boost::asio::ssl::stream_base::client, - strand_.wrap(std::bind( - &WorkSSL::onHandshake, shared_from_this(), std::placeholders::_1))); -} - -void -WorkSSL::onHandshake(error_code const& ec) -{ - if (ec) - return fail(ec); - - onStart(); -} - } // namespace detail } // namespace ripple diff --git a/src/ripple/app/misc/detail/impl/WorkSSL.cpp b/src/ripple/app/misc/detail/impl/WorkSSL.cpp new file mode 100644 index 000000000..78a269e67 --- /dev/null +++ b/src/ripple/app/misc/detail/impl/WorkSSL.cpp @@ -0,0 +1,69 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include + +namespace ripple { +namespace detail { + +WorkSSL::WorkSSL( + std::string const& host, + std::string const& path, + std::string const& port, + boost::asio::io_service& ios, + beast::Journal j, + Config const& config, + endpoint_type const& lastEndpoint, + bool lastStatus, + callback_type cb) + : WorkBase(host, path, port, ios, lastEndpoint, lastStatus, cb) + , context_(config, j, boost::asio::ssl::context::tlsv12_client) + , stream_(socket_, context_.context()) +{ + auto ec = context_.preConnectVerify(stream_, host_); + if (ec) + Throw( + boost::str(boost::format("preConnectVerify: %s") % ec.message())); +} + +void +WorkSSL::onConnect(error_code const& ec) +{ + auto err = ec ? ec : context_.postConnectVerify(stream_, host_); + if (err) + return fail(err); + + stream_.async_handshake( + boost::asio::ssl::stream_base::client, + strand_.wrap(std::bind( + &WorkSSL::onHandshake, shared_from_this(), std::placeholders::_1))); +} + +void +WorkSSL::onHandshake(error_code const& ec) +{ + if (ec) + return fail(ec); + + onStart(); +} + +} // namespace detail + +} // namespace ripple diff --git a/src/ripple/app/misc/impl/ValidatorSite.cpp b/src/ripple/app/misc/impl/ValidatorSite.cpp index aa2025258..17d05e2b0 100644 --- a/src/ripple/app/misc/impl/ValidatorSite.cpp +++ b/src/ripple/app/misc/impl/ValidatorSite.cpp @@ -82,12 +82,14 @@ ValidatorSite::Site::Site(std::string uri) , redirCount{0} , refreshInterval{default_refresh_interval} , nextRefresh{clock_type::now()} + , lastRequestEndpoint{} + , lastRequestSuccessful{false} { } ValidatorSite::ValidatorSite( Application& app, - boost::optional j, + std::optional j, std::chrono::seconds timeout) : app_{app} , j_{j ? *j : app_.logs().journal("ValidatorSite")} @@ -240,9 +242,11 @@ ValidatorSite::makeRequest( } }; auto onFetch = [this, siteIdx, timeoutCancel]( - error_code const& err, detail::response_type&& resp) { + error_code const& err, + endpoint_type const& endpoint, + detail::response_type&& resp) { timeoutCancel(); - onSiteFetch(err, std::move(resp), siteIdx); + onSiteFetch(err, endpoint, std::move(resp), siteIdx); }; auto onFetchFile = [this, siteIdx, timeoutCancel]( @@ -263,6 +267,8 @@ ValidatorSite::makeRequest( app_.getIOService(), j_, app_.config(), + sites_[siteIdx].lastRequestEndpoint, + sites_[siteIdx].lastRequestSuccessful, onFetch); } else if (resource->pUrl.scheme == "http") @@ -272,6 +278,8 @@ ValidatorSite::makeRequest( resource->pUrl.path, std::to_string(*resource->pUrl.port), app_.getIOService(), + sites_[siteIdx].lastRequestEndpoint, + sites_[siteIdx].lastRequestSuccessful, onFetch); } else @@ -281,6 +289,7 @@ ValidatorSite::makeRequest( resource->pUrl.path, app_.getIOService(), onFetchFile); } + sites_[siteIdx].lastRequestSuccessful = false; work_ = sp; sp->run(); // start a timer for the request, which shouldn't take more @@ -315,9 +324,9 @@ ValidatorSite::onTimer(std::size_t siteIdx, error_code const& ec) if (ec) { // Restart the timer if any errors are encountered, unless the error - // is from the wait operating being aborted due to a shutdown request. + // is from the wait operation being aborted due to a shutdown request. if (ec != boost::asio::error::operation_aborted) - onSiteFetch(ec, detail::response_type{}, siteIdx); + onSiteFetch(ec, {}, detail::response_type{}, siteIdx); return; } @@ -334,6 +343,7 @@ ValidatorSite::onTimer(std::size_t siteIdx, error_code const& ec) { onSiteFetch( boost::system::error_code{-1, boost::system::generic_category()}, + {}, detail::response_type{}, siteIdx); } @@ -472,13 +482,17 @@ ValidatorSite::processRedirect( void ValidatorSite::onSiteFetch( boost::system::error_code const& ec, + endpoint_type const& endpoint, detail::response_type&& res, std::size_t siteIdx) { { std::lock_guard lock_sites{sites_mutex_}; + if (endpoint != endpoint_type{}) + sites_[siteIdx].lastRequestEndpoint = endpoint; JLOG(j_.debug()) << "Got completion for " - << sites_[siteIdx].activeResource->uri; + << sites_[siteIdx].activeResource->uri << " " + << endpoint; auto onError = [&](std::string const& errMsg, bool retry) { sites_[siteIdx].lastRefreshStatus.emplace(Site::Status{ clock_type::now(), ListDisposition::invalid, errMsg}); @@ -492,9 +506,10 @@ ValidatorSite::onSiteFetch( }; if (ec) { - JLOG(j_.warn()) << "Problem retrieving from " - << sites_[siteIdx].activeResource->uri << " " - << ec.value() << ":" << ec.message(); + JLOG(j_.warn()) + << "Problem retrieving from " + << sites_[siteIdx].activeResource->uri << " " << endpoint << " " + << ec.value() << ":" << ec.message(); onError("fetch error", true); } else @@ -505,6 +520,7 @@ ValidatorSite::onSiteFetch( switch (res.result()) { case status::ok: + sites_[siteIdx].lastRequestSuccessful = true; parseJsonResponse(res.body(), siteIdx, lock_sites); break; case status::moved_permanently: @@ -527,7 +543,8 @@ ValidatorSite::onSiteFetch( default: { JLOG(j_.warn()) << "Request for validator list at " - << sites_[siteIdx].activeResource->uri + << sites_[siteIdx].activeResource->uri << " " + << endpoint << " returned bad status: " << res.result_int(); onError("bad result code", true); } @@ -566,6 +583,8 @@ ValidatorSite::onTextFetch( throw std::runtime_error{"fetch error"}; } + sites_[siteIdx].lastRequestSuccessful = true; + parseJsonResponse(res, siteIdx, lock_sites); } catch (std::exception& ex) diff --git a/src/test/app/DNS_test.cpp b/src/test/app/DNS_test.cpp new file mode 100644 index 000000000..6ae4bc8d6 --- /dev/null +++ b/src/test/app/DNS_test.cpp @@ -0,0 +1,132 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include + +#include +#include + +namespace ripple { +namespace test { + +class DNS_test : public beast::unit_test::suite +{ + using endpoint_type = boost::asio::ip::tcp::endpoint; + using error_code = boost::system::error_code; + std::weak_ptr work_; + endpoint_type lastEndpoint_{}; + parsedURL pUrl_; + std::string port_; + jtx::Env env_; + std::map resolved_; + std::mutex mutex_; + std::condition_variable cv_; + +public: + DNS_test() : env_(*this) + { + } + + void + makeRequest(endpoint_type const& lastEndpoint, bool lastStatus) + { + auto onFetch = [&](error_code const& errorCode, + endpoint_type const& endpoint, + ripple::detail::response_type&& resp) { + BEAST_EXPECT(!errorCode); + lastEndpoint_ = endpoint; + resolved_[endpoint.address().to_string()]++; + cv_.notify_all(); + }; + + auto sp = std::make_shared( + pUrl_.domain, + pUrl_.path, + port_, + env_.app().getIOService(), + env_.journal, + env_.app().config(), + lastEndpoint, + lastStatus, + onFetch); + work_ = sp; + sp->run(); + + std::unique_lock l(mutex_); + cv_.wait(l); + } + + bool + isMultipleEndpoints() + { + using boost::asio::ip::tcp; + tcp::resolver resolver(env_.app().getIOService()); + std::string port = pUrl_.port ? std::to_string(*pUrl_.port) : "443"; + tcp::resolver::iterator it = resolver.resolve(pUrl_.domain, port); + tcp::resolver::iterator end; + int n = 0; + for (; it != end; ++it) + ++n; + return n > 1; + } + + void + parse() + { + std::string url = arg(); + if (url == "") + url = "https://vl.ripple.com"; + BEAST_EXPECT(parseUrl(pUrl_, url)); + port_ = pUrl_.port ? std::to_string(*pUrl_.port) : "443"; + } + + void + run() override + { + parse(); + // First endpoint is random. Next three + // should resolve to the same endpoint. Run a few times + // to verify we are not selecting by chance the same endpoint. + for (int i = 1; i <= 4; ++i) + { + makeRequest(lastEndpoint_, true); + BEAST_EXPECT( + resolved_.size() == 1 && resolved_.begin()->second == i); + } + if (!isMultipleEndpoints()) + return; + // Run with the "failed" status. In this case endpoints are selected at + // random but last endpoint is not selected. + resolved_.clear(); + for (int i = 0; i < 4; ++i) + makeRequest(lastEndpoint_, false); + // Should have more than one but some endpoints can repeat since + // selected at random. We'll never have four identical endpoints + // here because on failure we randomly select an endpoint different + // from the last endpoint. + BEAST_EXPECT(resolved_.size() > 1); + } +}; + +BEAST_DEFINE_TESTSUITE_MANUAL_PRIO(DNS, ripple_data, ripple, 20); + +} // namespace test +} // namespace ripple