diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index 449cc00e14..98376d276e 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -2494,6 +2494,11 @@ + + True + + + True diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index 1efd106d52..31252e9dc3 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -3513,6 +3513,12 @@ ripple\nodestore + + ripple\overlay\impl + + + ripple\overlay\impl + ripple\overlay\impl diff --git a/src/ripple/overlay/Message.h b/src/ripple/overlay/Message.h index 32f4929cc7..f3cc5487ca 100644 --- a/src/ripple/overlay/Message.h +++ b/src/ripple/overlay/Message.h @@ -122,7 +122,6 @@ public: return type(buffers_begin(buffers), buffers_end(buffers)); } - /** @} */ private: diff --git a/src/ripple/overlay/impl/ConnectAttempt.cpp b/src/ripple/overlay/impl/ConnectAttempt.cpp new file mode 100644 index 0000000000..8e4192292f --- /dev/null +++ b/src/ripple/overlay/impl/ConnectAttempt.cpp @@ -0,0 +1,620 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include + +namespace ripple { + +ConnectAttempt::ConnectAttempt (boost::asio::io_service& io_service, + endpoint_type const& remote_endpoint, Resource::Consumer usage, + beast::asio::ssl_bundle::shared_context const& context, + std::uint32_t id, beast::Journal journal, + OverlayImpl& overlay) + : Child (overlay) + , id_ (id) + , sink_ (journal, OverlayImpl::makePrefix(id)) + , journal_ (sink_) + , remote_endpoint_ (remote_endpoint) + , usage_ (usage) + , strand_ (io_service) + , timer_ (io_service) + , ssl_bundle_ (std::make_unique( + context, io_service)) + , socket_ (ssl_bundle_->socket) + , stream_ (ssl_bundle_->stream) + , parser_ ( + [&](void const* data, std::size_t size) + { + body_.commit(boost::asio::buffer_copy(body_.prepare(size), + boost::asio::buffer(data, size))); + } + , response_, false) + , slot_(overlay_.peerFinder().new_outbound_slot( + beast::IPAddressConversion::from_asio(remote_endpoint))) +{ + if (journal_.trace) journal_.trace << + "Connect " << remote_endpoint; +} + +ConnectAttempt::~ConnectAttempt() +{ + if (slot_ != nullptr) + overlay_.peerFinder().on_closed(slot_); + if (journal_.trace) journal_.trace << + "~ConnectAttempt"; +} + +void +ConnectAttempt::stop() +{ + if (! strand_.running_in_this_thread()) + return strand_.post(std::bind( + &ConnectAttempt::stop, shared_from_this())); + if (stream_.next_layer().is_open()) + { + if (journal_.debug) journal_.debug << + "Stop"; + } +} + +void +ConnectAttempt::run() +{ + error_code ec; + stream_.next_layer().async_connect (remote_endpoint_, + strand_.wrap (std::bind (&ConnectAttempt::onConnect, + shared_from_this(), beast::asio::placeholders::error))); +} + +//------------------------------------------------------------------------------ + +void +ConnectAttempt::close() +{ + assert(strand_.running_in_this_thread()); + if (stream_.next_layer().is_open()) + { + error_code ec; + timer_.cancel(ec); + socket_.close(ec); + if (journal_.debug) journal_.debug << + "Closed"; + } +} + +void +ConnectAttempt::fail (std::string const& reason) +{ + assert(strand_.running_in_this_thread()); + if (stream_.next_layer().is_open()) + if (journal_.debug) journal_.debug << + reason; + close(); +} + +void +ConnectAttempt::fail (std::string const& name, error_code ec) +{ + assert(strand_.running_in_this_thread()); + if (stream_.next_layer().is_open()) + if (journal_.debug) journal_.debug << + name << ": " << ec.message(); + close(); +} + +void +ConnectAttempt::setTimer() +{ + error_code ec; + timer_.expires_from_now(std::chrono::seconds(15), ec); + if (ec) + { + if (journal_.error) journal_.error << + "setTimer: " << ec.message(); + return; + } + + timer_.async_wait(strand_.wrap(std::bind( + &ConnectAttempt::onTimer, shared_from_this(), + beast::asio::placeholders::error))); +} + +void +ConnectAttempt::cancelTimer() +{ + error_code ec; + timer_.cancel(ec); +} + +void +ConnectAttempt::onTimer (error_code ec) +{ + if (! stream_.next_layer().is_open()) + return; + if (ec == boost::asio::error::operation_aborted) + return; + if (ec) + { + // This should never happen + if (journal_.error) journal_.error << + "onTimer: " << ec.message(); + return close(); + } + fail("Timeout"); +} + +void +ConnectAttempt::onConnect (error_code ec) +{ + cancelTimer(); + + if(ec == boost::asio::error::operation_aborted) + return; + endpoint_type local_endpoint; + if(! ec) + local_endpoint = stream_.next_layer().local_endpoint(ec); + if(ec) + return fail("onConnect", ec); + if(! stream_.next_layer().is_open()) + return; + if(journal_.trace) journal_.trace << + "onConnect"; + + setTimer(); + stream_.set_verify_mode (boost::asio::ssl::verify_none); + stream_.async_handshake (boost::asio::ssl::stream_base::client, + strand_.wrap (std::bind (&ConnectAttempt::onHandshake, + shared_from_this(), beast::asio::placeholders::error))); +} + +void +ConnectAttempt::onHandshake (error_code ec) +{ + cancelTimer(); + + if(! stream_.next_layer().is_open()) + return; + if(ec == boost::asio::error::operation_aborted) + return; + + endpoint_type local_endpoint = + stream_.next_layer().local_endpoint(ec); + if(ec) + return fail("onHandshake", ec); + if(journal_.trace) journal_.trace << + "onHandshake"; + + if (! overlay_.peerFinder().onConnected (slot_, + beast::IPAddressConversion::from_asio (local_endpoint))) + return fail("Duplicate connection"); + + if (! overlay_.setup().http_handshake) + return doLegacy(); + + bool success; + uint256 sharedValue; + std::tie(sharedValue, success) = makeSharedValue( + stream_.native_handle(), journal_); + if (! success) + return close(); // makeSharedValue logs + + beast::http::message req = makeRequest( + remote_endpoint_.address()); + auto const hello = buildHello (sharedValue, getApp()); + appendHello (req, hello); + + using beast::http::write; + write (write_buf_, req); + + setTimer(); + stream_.async_write_some (write_buf_.data(), + strand_.wrap (std::bind (&ConnectAttempt::onWrite, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); +} + +void +ConnectAttempt::onWrite (error_code ec, std::size_t bytes_transferred) +{ + cancelTimer(); + + if(! stream_.next_layer().is_open()) + return; + if(ec == boost::asio::error::operation_aborted) + return; + if(ec) + return fail("onWrite", ec); + if(journal_.trace) journal_.trace << + "onWrite: " << bytes_transferred << " bytes"; + + write_buf_.consume (bytes_transferred); + if (write_buf_.size() == 0) + return onRead (error_code(), 0); + + setTimer(); + stream_.async_write_some (write_buf_.data(), + strand_.wrap (std::bind (&ConnectAttempt::onWrite, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); +} + +void +ConnectAttempt::onRead (error_code ec, std::size_t bytes_transferred) +{ + cancelTimer(); + + if(! stream_.next_layer().is_open()) + return; + if(ec == boost::asio::error::operation_aborted) + return; + if(ec == boost::asio::error::eof) + { + if(journal_.info) journal_.info << + "EOF"; + setTimer(); + return stream_.async_shutdown(strand_.wrap(std::bind( + &ConnectAttempt::onShutdown, shared_from_this(), + beast::asio::placeholders::error))); + } + if(ec) + return fail("onRead", ec); + if(journal_.trace) + { + if(bytes_transferred > 0) journal_.trace << + "onRead: " << bytes_transferred << " bytes"; + else journal_.trace << + "onRead"; + } + + if (! ec) + { + write_buf_.commit (bytes_transferred); + std::size_t bytes_consumed; + std::tie (ec, bytes_consumed) = parser_.write( + write_buf_.data()); + if (! ec) + { + write_buf_.consume (bytes_consumed); + if (parser_.complete()) + return processResponse(response_, body_); + } + } + + if (ec) + return fail("onRead", ec); + + setTimer(); + stream_.async_read_some (write_buf_.prepare (Tuning::readBufferBytes), + strand_.wrap (std::bind (&ConnectAttempt::onRead, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); +} + +void +ConnectAttempt::onShutdown (error_code ec) +{ + cancelTimer(); + if (! ec) + { + if (journal_.error) journal_.error << + "onShutdown: expected error condition"; + return close(); + } + if (ec != boost::asio::error::eof) + return fail("onShutdown", ec); + close(); +} + +//-------------------------------------------------------------------------- + +// Perform a legacy outgoing connection +void +ConnectAttempt::doLegacy() +{ + if(journal_.trace) journal_.trace << + "doLegacy"; + + bool success; + uint256 sharedValue; + std::tie(sharedValue, success) = makeSharedValue( + stream_.native_handle(), journal_); + if (! success) + return fail("hello"); + + auto const hello = buildHello(sharedValue, getApp()); + write (write_buf_, hello, protocol::mtHELLO, + Tuning::readBufferBytes); + + stream_.async_write_some (write_buf_.data(), + strand_.wrap (std::bind (&ConnectAttempt::onWriteHello, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); + + // Timer gets reset after header AND body received + setTimer(); + boost::asio::async_read (stream_, read_buf_.prepare ( + Message::kHeaderBytes), boost::asio::transfer_exactly ( + Message::kHeaderBytes), strand_.wrap (std::bind ( + &ConnectAttempt::onReadHeader, shared_from_this(), + beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); +} + +void +ConnectAttempt::onWriteHello (error_code ec, std::size_t bytes_transferred) +{ + if(! stream_.next_layer().is_open()) + return; + if(ec == boost::asio::error::operation_aborted) + return; + if(ec) + return fail("onWriteHello", ec); + if(journal_.trace) + { + if(bytes_transferred > 0) journal_.trace << + "onWriteHello: " << bytes_transferred << " bytes"; + else journal_.trace << + "onWriteHello"; + } + + write_buf_.consume (bytes_transferred); + if (write_buf_.size() > 0) + return stream_.async_write_some (write_buf_.data(), + strand_.wrap (std::bind (&ConnectAttempt::onWriteHello, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); +} + +void +ConnectAttempt::onReadHeader (error_code ec, + std::size_t bytes_transferred) +{ + if(! stream_.next_layer().is_open()) + return; + if(ec == boost::asio::error::operation_aborted) + return; + if(ec == boost::asio::error::eof) + { + if(journal_.info) journal_.info << + "EOF"; + setTimer(); + return stream_.async_shutdown(strand_.wrap(std::bind( + &ConnectAttempt::onShutdown, shared_from_this(), + beast::asio::placeholders::error))); + } + if(ec) + return fail("onReadHeader", ec); + if(journal_.trace) + { + if(bytes_transferred > 0) journal_.trace << + "onReadHeader: " << bytes_transferred << " bytes"; + else journal_.trace << + "onReadHeader"; + } + + assert(bytes_transferred == Message::kHeaderBytes); + read_buf_.commit(bytes_transferred); + + int const type = Message::type(read_buf_.data()); + if (type != protocol::mtHELLO) + return fail("Expected TMHello"); + + std::size_t const bytes_needed = + Message::size(read_buf_.data()); + + read_buf_.consume (Message::kHeaderBytes); + + boost::asio::async_read (stream_, read_buf_.prepare(bytes_needed), + boost::asio::transfer_exactly(bytes_needed), strand_.wrap ( + std::bind (&ConnectAttempt::onReadBody, shared_from_this(), + beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); +} + +void +ConnectAttempt::onReadBody (error_code ec, + std::size_t bytes_transferred) +{ + cancelTimer(); + + if(! stream_.next_layer().is_open()) + return; + if(ec == boost::asio::error::operation_aborted) + return; + if(ec == boost::asio::error::eof) + { + if(journal_.info) journal_.info << + "EOF"; + setTimer(); + return stream_.async_shutdown(strand_.wrap(std::bind( + &ConnectAttempt::onShutdown, shared_from_this(), + beast::asio::placeholders::error))); + } + if(ec) + return fail("onReadBody", ec); + if(journal_.trace) + { + if(bytes_transferred > 0) journal_.trace << + "onReadBody: " << bytes_transferred << " bytes"; + else journal_.trace << + "onReadBody"; + } + + read_buf_.commit (bytes_transferred); + + protocol::TMHello hello; + ZeroCopyInputStream< + beast::asio::streambuf::const_buffers_type> stream ( + read_buf_.data()); + if (! hello.ParseFromZeroCopyStream (&stream)) + return fail("onReadBody: parse"); + read_buf_.consume (stream.ByteCount()); + + bool success; + uint256 sharedValue; + std::tie(sharedValue, success) = makeSharedValue( + ssl_bundle_->stream.native_handle(), journal_); + if(! success) + return close(); // makeSharedValue logs + + RippleAddress publicKey; + std::tie(publicKey, success) = verifyHello (hello, + sharedValue, journal_, getApp()); + if(! success) + return close(); // verifyHello logs + + auto protocol = BuildInfo::make_protocol(hello.protoversion()); + if(journal_.info) journal_.info << + "Protocol: " << to_string(protocol); + if(journal_.info) journal_.info << + "Public Key: " << publicKey.humanNodePublic(); + std::string name; + bool const cluster = getApp().getUNL().nodeInCluster(publicKey, name); + if (cluster) + if (journal_.info) journal_.info << + "Cluster name: " << name; + + auto const result = overlay_.peerFinder().activate ( + slot_, RipplePublicKey(publicKey), cluster); + if (result != PeerFinder::Result::success) + return fail("Outbound slots full"); + + auto const peer = std::make_shared( + std::move(ssl_bundle_), read_buf_.data(), + std::move(slot_), usage_, std::move(hello), + publicKey, id_, overlay_); + + overlay_.add_active (peer); +} + +//-------------------------------------------------------------------------- + +beast::http::message +ConnectAttempt::makeRequest ( + boost::asio::ip::address const& remote_address) +{ + beast::http::message m; + m.method (beast::http::method_t::http_get); + m.url ("/"); + m.version (1, 1); + m.headers.append ("User-Agent", BuildInfo::getFullVersionString()); + m.headers.append ("Upgrade", "RTXP/1.2"); + //std::string("RTXP/") + to_string (BuildInfo::getCurrentProtocol())); + m.headers.append ("Connection", "Upgrade"); + m.headers.append ("Connect-As", "Peer"); + //m.headers.append ("Connect-As", "Leaf, Peer"); + //m.headers.append ("Accept-Encoding", "identity"); + //m.headers.append ("Local-Address", stream_. + //m.headers.append ("X-Try-IPs", "192.168.0.1:51234"); + //m.headers.append ("X-Try-IPs", "208.239.114.74:51234"); + //m.headers.append ("A", "BC"); + //m.headers.append ("Content-Length", "0"); + return m; +} + +template +void +ConnectAttempt::processResponse (beast::http::message const& m, + Streambuf const& body) +{ + if (response_.status() == 503) + { + Json::Value json; + Json::Reader r; + auto const success = r.parse(to_string(body), json); + if (success) + { + if (json.isObject() && json.isMember("peer-ips")) + { + Json::Value const& ips = json["peer-ips"]; + if (ips.isArray()) + { + std::vector eps; + eps.reserve(ips.size()); + for (auto const& v : ips) + { + if (v.isString()) + { + error_code ec; + auto const ep = parse_endpoint(v.asString(), ec); + if (!ec) + eps.push_back(ep); + } + } + overlay_.peerFinder().onRedirects( + remote_endpoint_, eps); + } + } + } + } + + if (! OverlayImpl::isPeerUpgrade(m)) + { + if (journal_.info) journal_.info << + "HTTP Response: " << m.status() << " " << m.reason(); + return close(); + } + + bool success; + protocol::TMHello hello; + std::tie(hello, success) = parseHello (response_, journal_); + if(! success) + return fail("processResponse: Bad TMHello"); + + uint256 sharedValue; + std::tie(sharedValue, success) = makeSharedValue( + ssl_bundle_->stream.native_handle(), journal_); + if(! success) + return close(); // makeSharedValue logs + + RippleAddress publicKey; + std::tie(publicKey, success) = verifyHello (hello, + sharedValue, journal_, getApp()); + if(! success) + return close(); // verifyHello logs + if(journal_.info) journal_.info << + "Public Key: " << publicKey.humanNodePublic(); + + auto const protocol = + BuildInfo::make_protocol(hello.protoversion()); + if(journal_.info) journal_.info << + "Protocol: " << to_string(protocol); + + std::string name; + bool const clusterNode = + getApp().getUNL().nodeInCluster(publicKey, name); + if (clusterNode) + if (journal_.info) journal_.info << + "Cluster name: " << name; + + auto const result = overlay_.peerFinder().activate (slot_, + RipplePublicKey(publicKey), clusterNode); + if (result != PeerFinder::Result::success) + return fail("Outbound slots full"); + + auto const peer = std::make_shared( + std::move(ssl_bundle_), read_buf_.data(), + std::move(slot_), usage_, std::move(hello), + publicKey, id_, overlay_); + + overlay_.add_active (peer); +} + +} // ripple diff --git a/src/ripple/overlay/impl/ConnectAttempt.h b/src/ripple/overlay/impl/ConnectAttempt.h new file mode 100644 index 0000000000..475f65c177 --- /dev/null +++ b/src/ripple/overlay/impl/ConnectAttempt.h @@ -0,0 +1,136 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_CONNECTATTEMPT_H_INCLUDED +#define RIPPLE_OVERLAY_CONNECTATTEMPT_H_INCLUDED + +#include "ripple.pb.h" +#include +#include +#include +#include +#include +#include // move to .cpp +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include // +#include +#include + +namespace ripple { + +/** Manages an outbound connection attempt. */ +class ConnectAttempt + : public OverlayImpl::Child + , public std::enable_shared_from_this +{ +private: + using error_code = boost::system::error_code; + using endpoint_type = boost::asio::ip::tcp::endpoint; + + std::uint32_t const id_; + beast::WrappedSink sink_; + beast::Journal journal_; + endpoint_type remote_endpoint_; + Resource::Consumer usage_; + boost::asio::io_service::strand strand_; + boost::asio::basic_waitable_timer timer_; + std::unique_ptr ssl_bundle_; + beast::asio::ssl_bundle::socket_type& socket_; + beast::asio::ssl_bundle::stream_type& stream_; + beast::asio::streambuf read_buf_; + beast::asio::streambuf write_buf_; + beast::http::message response_; + beast::asio::streambuf body_; + beast::http::parser parser_; + PeerFinder::Slot::ptr slot_; + +public: + ConnectAttempt (boost::asio::io_service& io_service, + endpoint_type const& remote_endpoint, Resource::Consumer usage, + beast::asio::ssl_bundle::shared_context const& context, + std::uint32_t id, beast::Journal journal, + OverlayImpl& overlay); + + ~ConnectAttempt(); + + void + stop() override; + + void + run(); + +private: + void close(); + void fail (std::string const& reason); + void fail (std::string const& name, error_code ec); + void setTimer(); + void cancelTimer(); + void onTimer (error_code ec); + void onConnect (error_code ec); + void onHandshake (error_code ec); + void onWrite (error_code ec, std::size_t bytes_transferred); + void onRead (error_code ec, std::size_t bytes_transferred); + void onShutdown (error_code ec); + + void doLegacy(); + void onWriteHello (error_code ec, std::size_t bytes_transferred); + void onReadHeader (error_code ec, std::size_t bytes_transferred); + void onReadBody (error_code ec, std::size_t bytes_transferred); + + static + beast::http::message + makeRequest (boost::asio::ip::address const& remote_address); + + template + void processResponse (beast::http::message const& m, + Streambuf const& body); + + template + static + boost::asio::ip::tcp::endpoint + parse_endpoint (std::string const& s, boost::system::error_code& ec) + { + beast::IP::Endpoint bep; + std::istringstream is(s); + is >> bep; + if (is.fail()) + { + ec = boost::system::errc::make_error_code( + boost::system::errc::invalid_argument); + return boost::asio::ip::tcp::endpoint{}; + } + + return beast::IPAddressConversion::to_asio_endpoint(bep); + } +}; + +} + +#endif diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 8ada28109a..b0688c9d12 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -27,6 +28,7 @@ #include #include #include +#include namespace ripple { @@ -185,21 +187,25 @@ OverlayImpl::onHandoff (std::unique_ptr && ssl_bundle, beast::http::message&& request, endpoint_type remote_endpoint) { + auto const id = next_id_++; + beast::WrappedSink sink (deprecatedLogs()["Peer"], makePrefix(id)); + beast::Journal journal (sink); + Handoff handoff; if (! isPeerUpgrade(request)) return handoff; handoff.moved = true; - if (journal_.trace) journal_.trace << + if (journal.trace) journal.trace << "Peer connection upgrade from " << remote_endpoint; error_code ec; auto const local_endpoint (ssl_bundle->socket.local_endpoint(ec)); if (ec) { - if (journal_.trace) journal_.trace << - "Peer " << remote_endpoint << " failed: " << ec.message(); + if (journal.trace) journal.trace << + remote_endpoint << " failed: " << ec.message(); return handoff; } @@ -243,19 +249,19 @@ OverlayImpl::onHandoff (std::unique_ptr && ssl_bundle, bool success = true; protocol::TMHello hello; - std::tie(hello, success) = parseHello (request, journal_); + std::tie(hello, success) = parseHello (request, journal); if(! success) return handoff; uint256 sharedValue; std::tie(sharedValue, success) = makeSharedValue( - ssl_bundle->stream.native_handle(), journal_); + ssl_bundle->stream.native_handle(), journal); if(! success) return handoff; RippleAddress publicKey; std::tie(publicKey, success) = verifyHello (hello, - sharedValue, journal_, getApp()); + sharedValue, journal, getApp()); if(! success) return handoff; @@ -267,7 +273,7 @@ OverlayImpl::onHandoff (std::unique_ptr && ssl_bundle, RipplePublicKey(publicKey), cluster); if (result != PeerFinder::Result::success) { - if (journal_.trace) journal_.trace << + if (journal.trace) journal.trace << "Peer " << remote_endpoint << " redirected, slots full"; handoff.moved = false; handoff.response = makeRedirectResponse(slot, request, @@ -276,7 +282,7 @@ OverlayImpl::onHandoff (std::unique_ptr && ssl_bundle, return handoff; } - auto const peer = std::make_shared(next_id_++, + auto const peer = std::make_shared(id, remote_endpoint, slot, std::move(request), hello, publicKey, consumer, std::move(ssl_bundle), *this); { @@ -307,6 +313,14 @@ OverlayImpl::isPeerUpgrade(beast::http::message const& request) return true; } +std::string +OverlayImpl::makePrefix (std::uint32_t id) +{ + std::stringstream ss; + ss << "[" << std::setfill('0') << std::setw(3) << id << "] "; + return ss.str(); +} + std::shared_ptr OverlayImpl::makeRedirectResponse (PeerFinder::Slot::ptr const& slot, beast::http::message const& request, address_type remote_address) @@ -340,23 +354,69 @@ OverlayImpl::connect (beast::IP::Endpoint const& remote_endpoint) { assert(work_); - PeerFinder::Slot::ptr const slot = - m_peerFinder->new_outbound_slot (remote_endpoint); - if (slot == nullptr) - return; - auto const peer = std::make_shared(next_id_++, - remote_endpoint, slot, io_service_, setup_.context, *this); + auto usage = resourceManager().newOutboundEndpoint (remote_endpoint); + if (usage.disconnect()) { - // We're on the strand but lets make this code - // the same as the others to avoid confusion. - std::lock_guard lock (mutex_); - add(peer); - peer->run(); + if (journal_.info) journal_.info << + "Over resource limit: " << remote_endpoint; + return; } + + auto const p = std::make_shared( + io_service_, beast::IPAddressConversion::to_asio_endpoint(remote_endpoint), + usage, setup_.context, next_id_++, + deprecatedLogs().journal("Peer"), *this); + + std::lock_guard lock(mutex_); + list_.emplace(p.get(), p); + p->run(); } //------------------------------------------------------------------------------ +// Adds a peer that is already handshaked and active +void +OverlayImpl::add_active (std::shared_ptr const& peer) +{ + std::lock_guard lock (mutex_); + + { + auto const result = + m_peers.emplace (peer->slot(), peer); + assert (result.second); + (void) result.second; + } + + // Now track this peer + { + auto const result (m_shortIdMap.emplace ( + std::piecewise_construct, + std::make_tuple (peer->id()), + std::make_tuple (peer))); + assert(result.second); + (void) result.second; + } + + { + auto const result (m_publicKeyMap.emplace( + peer->getNodePublic(), peer)); + assert(result.second); + (void) result.second; + } + + list_.emplace(peer.get(), peer); + + journal_.debug << + "activated " << peer->getRemoteAddress() << + " (" << peer->id() << + ":" << RipplePublicKey(peer->getNodePublic()) << ")"; + + // As we are not on the strand, run() must be called + // while holding the lock, otherwise new I/O can be + // queued after a call to stop(). + peer->run(); +} + void OverlayImpl::remove (PeerFinder::Slot::ptr const& slot) { diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 4d1f502d7a..df37ed3d9f 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -30,8 +30,8 @@ #include #include #include +#include #include -#include #include #include #include @@ -68,7 +68,6 @@ private: using address_type = boost::asio::ip::address; using endpoint_type = boost::asio::ip::tcp::endpoint; using error_code = boost::system::error_code; - using yield_context = boost::asio::yield_context; struct Timer : Child @@ -171,6 +170,9 @@ public: Peer::ptr findPeerByShortID (Peer::id_t const& id) override; + void + add_active (std::shared_ptr const& peer); + void remove (PeerFinder::Slot::ptr const& slot); @@ -190,6 +192,10 @@ public: bool isPeerUpgrade (beast::http::message const& request); + static + std::string + makePrefix (std::uint32_t id); + private: std::shared_ptr makeRedirectResponse (PeerFinder::Slot::ptr const& slot, diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 9cf3be0c90..0b2850822c 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -65,30 +65,6 @@ PeerImp::PeerImp (id_t id, endpoint_type remote_endpoint, { } -PeerImp::PeerImp (id_t id, beast::IP::Endpoint remoteAddress, - PeerFinder::Slot::ptr const& slot, boost::asio::io_service& io_service, - std::shared_ptr const& context, - OverlayImpl& overlay) - : Child (overlay) - , id_(id) - , sink_(deprecatedLogs().journal("Peer"), makePrefix(id)) - , p_sink_(deprecatedLogs().journal("Protocol"), makePrefix(id)) - , journal_ (sink_) - , p_journal_(p_sink_) - , ssl_bundle_(std::make_unique( - context, io_service)) - , socket_ (ssl_bundle_->socket) - , stream_ (ssl_bundle_->stream) - , strand_ (socket_.get_io_service()) - , timer_ (socket_.get_io_service()) - , remote_address_ (remoteAddress) - , overlay_ (overlay) - , m_inbound (false) - , state_ (State::connecting) - , slot_ (slot) -{ -} - PeerImp::~PeerImp () { if (cluster()) @@ -116,7 +92,27 @@ PeerImp::run() } else { - doConnect(); + assert (state_ == State::active); + // XXX Set timer: connection is in grace period to be useful. + // XXX Set timer: connection idle (idle may vary depending on connection type.) + if ((hello_.has_ledgerclosed ()) && ( + hello_.ledgerclosed ().size () == (256 / 8))) + { + memcpy (closedLedgerHash_.begin (), + hello_.ledgerclosed ().data (), 256 / 8); + if ((hello_.has_ledgerprevious ()) && + (hello_.ledgerprevious ().size () == (256 / 8))) + { + memcpy (previousLedgerHash_.begin (), + hello_.ledgerprevious ().data (), 256 / 8); + addLedger (previousLedgerHash_); + } + else + { + previousLedgerHash_.zero(); + } + } + doProtocolStart(false); } } @@ -458,294 +454,6 @@ PeerImp::onShutdown(error_code ec) } //------------------------------------------------------------------------------ -// -// outbound -// - -void PeerImp::doConnect() -{ - if (journal_.info) journal_.info << - "Connect " << remote_address_; - usage_ = overlay_.resourceManager().newOutboundEndpoint (remote_address_); - if (usage_.disconnect()) - return fail("doConnect: Resources"); - - setTimer(); - stream_.next_layer().async_connect ( - beast::IPAddressConversion::to_asio_endpoint (remote_address_), - strand_.wrap (std::bind (&PeerImp::onConnect, - shared_from_this (), beast::asio::placeholders::error))); -} - -void -PeerImp::onConnect (error_code ec) -{ - assert(state_ == State::connecting); - cancelTimer(); - if(! socket_.is_open()) - return; - if(ec == boost::asio::error::operation_aborted) - return; - socket_type::endpoint_type local_endpoint; - if(! ec) - local_endpoint = stream_.next_layer().local_endpoint (ec); - if(ec) - return fail("onConnect", ec); - if(journal_.trace) journal_.trace << - "onConnect"; - - // VFALCO Can we do this after the call to onConnected? - state_ = State::connected; - if (! overlay_.peerFinder().onConnected (slot_, - beast::IPAddressConversion::from_asio (local_endpoint))) - return fail("onConnect: Duplicate"); - - setTimer(); - stream_.set_verify_mode (boost::asio::ssl::verify_none); - stream_.async_handshake ( - boost::asio::ssl::stream_base::client, - strand_.wrap (std::bind (&PeerImp::onHandshake, - std::static_pointer_cast (shared_from_this ()), - beast::asio::placeholders::error))); -} - -void -PeerImp::onHandshake (error_code ec) -{ - cancelTimer(); - if(! socket_.is_open()) - return; - if(ec == boost::asio::error::operation_aborted) - return; - if(ec) - return fail("onHandshake", ec); - if(journal_.trace) journal_.trace << - "onHandshake"; - if (! overlay_.setup().http_handshake) - return doProtocolStart(true); // legacy - - bool success; - std::tie(sharedValue_, success) = makeSharedValue( - stream_.native_handle(), journal_); - if (! success) - return close(); // makeSharedValue logs - - beast::http::message req = makeRequest( - beast::IPAddressConversion::to_asio_address(remote_address_)); - auto const hello = buildHello (sharedValue_, getApp()); - appendHello (req, hello); - write (write_buffer_, req); - - setTimer(); - stream_.async_write_some (write_buffer_.data(), - strand_.wrap (std::bind (&PeerImp::onWriteRequest, - shared_from_this(), beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); -} - -// Called repeatedly with the http request data -void -PeerImp::onWriteRequest (error_code ec, std::size_t bytes_transferred) -{ - cancelTimer(); - if(! socket_.is_open()) - return; - if(ec == boost::asio::error::operation_aborted) - return; - if(ec) - return fail("onWriteRequest", ec); - if(journal_.trace) journal_.trace << - "onWriteRequest: " << bytes_transferred << " bytes"; - - write_buffer_.consume (bytes_transferred); - if (write_buffer_.size() == 0) - { - http_parser_ = boost::in_place (std::ref(http_message_), - std::ref(http_body_), false); - return onReadResponse (error_code(), 0); - } - - setTimer(); - stream_.async_write_some (write_buffer_.data(), - strand_.wrap (std::bind (&PeerImp::onWriteRequest, - shared_from_this(), beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); -} - -template -boost::asio::ip::tcp::endpoint -parse_endpoint (std::string const& s, boost::system::error_code& ec) -{ - beast::IP::Endpoint bep; - std::istringstream is(s); - is >> bep; - if (is.fail()) - { - ec = boost::system::errc::make_error_code( - boost::system::errc::invalid_argument); - return boost::asio::ip::tcp::endpoint{}; - } - - return beast::IPAddressConversion::to_asio_endpoint(bep); -} - -// Called repeatedly with the http response data -void -PeerImp::onReadResponse (error_code ec, std::size_t bytes_transferred) -{ - cancelTimer(); - if(! socket_.is_open()) - return; - if(ec == boost::asio::error::operation_aborted) - return; - if(ec == boost::asio::error::eof) - { - if(journal_.info) journal_.info << - "EOF"; - setTimer(); - return stream_.async_shutdown(strand_.wrap(std::bind(&PeerImp::onShutdown, - shared_from_this(), beast::asio::placeholders::error))); - } - if(ec) - return fail("onReadResponse", ec); - if(journal_.trace) - { - if(bytes_transferred > 0) journal_.trace << - "onReadResponse: " << bytes_transferred << " bytes"; - else journal_.trace << - "onReadResponse"; - } - - if (! ec) - { - read_buffer_.commit (bytes_transferred); - std::size_t bytes_consumed; - std::tie (ec, bytes_consumed) = http_parser_->write ( - read_buffer_.data()); - if (! ec) - { - read_buffer_.consume (bytes_consumed); - if (http_parser_->complete()) - return processResponse(http_message_, http_body_); - } - } - - if (ec) - { - return fail("onReadResponse", ec); - } - - setTimer(); - stream_.async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), - strand_.wrap (std::bind (&PeerImp::onReadResponse, - shared_from_this(), beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); -} - -template -void -PeerImp::processResponse (beast::http::message const& m, - Streambuf const& body) -{ - if (http_message_.status() == 503) - { - Json::Value json; - Json::Reader r; - auto const success = r.parse(to_string(body), json); - if (success) - { - if (json.isObject() && json.isMember("peer-ips")) - { - Json::Value const& ips = json["peer-ips"]; - if (ips.isArray()) - { - std::vector eps; - eps.reserve(ips.size()); - for (auto const& v : ips) - { - if (v.isString()) - { - error_code ec; - auto const ep = parse_endpoint(v.asString(), ec); - if (!ec) - eps.push_back(ep); - } - } - overlay_.peerFinder().onRedirects(beast::IPAddressConversion:: - to_asio_endpoint(remote_address_), eps); - } - } - } - } - - if (! OverlayImpl::isPeerUpgrade(m)) - { - if (journal_.info) journal_.info << - "HTTP Response: " << m.status() << " " << m.reason(); - return close(); - } - - bool success; - std::tie(hello_, success) = parseHello (http_message_, journal_); - if(! success) - return fail("processResponse: Bad TMHello"); - - uint256 sharedValue; - std::tie(sharedValue, success) = makeSharedValue( - ssl_bundle_->stream.native_handle(), journal_); - if(! success) - return close(); // makeSharedValue logs - - std::tie(publicKey_, success) = verifyHello (hello_, - sharedValue, journal_, getApp()); - if(! success) - return close(); // verifyHello logs - - auto protocol = BuildInfo::make_protocol(hello_.protoversion()); - if(journal_.info) journal_.info << - "Protocol: " << to_string(protocol); - if(journal_.info) journal_.info << - "Public Key: " << publicKey_.humanNodePublic(); - bool const cluster = getApp().getUNL().nodeInCluster(publicKey_, name_); - if (cluster) - if (journal_.info) journal_.info << - "Cluster name: " << name_; - - auto const result = overlay_.peerFinder().activate ( - slot_, RipplePublicKey(publicKey_), cluster); - if (result != PeerFinder::Result::success) - return fail("Outbound slots full"); - - state_ = State::active; - overlay_.activate(shared_from_this ()); - - // XXX Set timer: connection is in grace period to be useful. - // XXX Set timer: connection idle (idle may vary depending on connection type.) - if ((hello_.has_ledgerclosed ()) && ( - hello_.ledgerclosed ().size () == (256 / 8))) - { - memcpy (closedLedgerHash_.begin (), - hello_.ledgerclosed ().data (), 256 / 8); - if ((hello_.has_ledgerprevious ()) && - (hello_.ledgerprevious ().size () == (256 / 8))) - { - memcpy (previousLedgerHash_.begin (), - hello_.ledgerprevious ().data (), 256 / 8); - addLedger (previousLedgerHash_); - } - else - { - previousLedgerHash_.zero(); - } - } - - doProtocolStart(false); -} - -//------------------------------------------------------------------------------ -// -// inbound -// void PeerImp::doLegacyAccept() { @@ -825,7 +533,7 @@ PeerImp::makeResponse (beast::http::message const& req, resp.reason("Switching Protocols"); resp.version(req.version()); resp.headers.append("Connection", "Upgrade"); - resp.headers.append("Upgrade", "RTXP/1.3"); + resp.headers.append("Upgrade", "RTXP/1.2"); resp.headers.append("Connect-AS", "Peer"); resp.headers.append("Server", BuildInfo::getFullVersionString()); protocol::TMHello hello = buildHello(sharedValue, getApp()); diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 2d9389007b..0631461788 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -80,7 +80,6 @@ public: private: using clock_type = std::chrono::steady_clock; using error_code= boost::system::error_code ; - using yield_context = boost::asio::yield_context; using socket_type = boost::asio::ip::tcp::socket; using stream_type = boost::asio::ssl::stream ; using address_type = boost::asio::ip::address; @@ -174,14 +173,17 @@ public: std::unique_ptr&& ssl_bundle, OverlayImpl& overlay); - /** Create an outgoing peer. */ - PeerImp (id_t id, beast::IP::Endpoint remoteAddress, - PeerFinder::Slot::ptr const& slot, boost::asio::io_service& io_service, - std::shared_ptr const& context, - OverlayImpl& overlay); + /** Create outgoing, handshaked peer. */ + // VFALCO legacyPublicKey should be implied by the Slot + template + PeerImp (std::unique_ptr&& ssl_bundle, + Buffers const& buffers, PeerFinder::Slot::ptr&& slot, + Resource::Consumer usage, protocol::TMHello&& hello, + RippleAddress const& legacyPublicKey, id_t id, + OverlayImpl& overlay); virtual - ~PeerImp (); + ~PeerImp(); PeerFinder::Slot::ptr const& slot() @@ -308,33 +310,6 @@ private: void onShutdown (error_code ec); - // - // outbound completion path - // - - void - doConnect(); - - void - onConnect (error_code ec); - - void - onHandshake (error_code ec); - - void - onWriteRequest (error_code ec, std::size_t bytes_transferred); - - void - onReadResponse (error_code ec, std::size_t bytes_transferred); - - template - void - processResponse (beast::http::message const& m, Streambuf const& body); - - // - // inbound completion path - // - void doAccept(); @@ -472,16 +447,16 @@ private: template PeerImp::PeerImp (id_t id, endpoint_type remote_endpoint, - PeerFinder::Slot::ptr const& slot, ConstBufferSequence const& buffer, + PeerFinder::Slot::ptr const& slot, ConstBufferSequence const& buffers, std::unique_ptr&& ssl_bundle, OverlayImpl& overlay) : Child (overlay) - , id_(id) - , sink_(deprecatedLogs().journal("Peer"), makePrefix(id)) - , p_sink_(deprecatedLogs().journal("Protocol"), makePrefix(id)) + , id_ (id) + , sink_ (deprecatedLogs().journal("Peer"), makePrefix(id)) + , p_sink_ (deprecatedLogs().journal("Protocol"), makePrefix(id)) , journal_ (sink_) - , p_journal_(p_sink_) - , ssl_bundle_(std::move(ssl_bundle)) + , p_journal_ (p_sink_) + , ssl_bundle_ (std::move(ssl_bundle)) , socket_ (ssl_bundle_->socket) , stream_ (ssl_bundle_->stream) , strand_ (socket_.get_io_service()) @@ -494,7 +469,37 @@ PeerImp::PeerImp (id_t id, endpoint_type remote_endpoint, , slot_ (slot) { read_buffer_.commit(boost::asio::buffer_copy(read_buffer_.prepare( - boost::asio::buffer_size(buffer)), buffer)); + boost::asio::buffer_size(buffers)), buffers)); +} + +template +PeerImp::PeerImp (std::unique_ptr&& ssl_bundle, + Buffers const& buffers, PeerFinder::Slot::ptr&& slot, + Resource::Consumer usage, protocol::TMHello&& hello, + RippleAddress const& legacyPublicKey, id_t id, + OverlayImpl& overlay) + : Child (overlay) + , id_ (id) + , sink_ (deprecatedLogs().journal("Peer"), makePrefix(id)) + , p_sink_ (deprecatedLogs().journal("Protocol"), makePrefix(id)) + , journal_ (sink_) + , p_journal_ (p_sink_) + , ssl_bundle_(std::move(ssl_bundle)) + , socket_ (ssl_bundle_->socket) + , stream_ (ssl_bundle_->stream) + , strand_ (socket_.get_io_service()) + , timer_ (socket_.get_io_service()) + , remote_address_ (slot->remote_endpoint()) + , overlay_ (overlay) + , m_inbound (false) + , state_ (State::active) + , publicKey_ (legacyPublicKey) + , hello_ (std::move(hello)) + , usage_ (usage) + , slot_ (std::move(slot)) +{ + read_buffer_.commit (boost::asio::buffer_copy(read_buffer_.prepare( + boost::asio::buffer_size(buffers)), buffers)); } template diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h index a10371ff69..429d1b8152 100644 --- a/src/ripple/overlay/impl/ProtocolMessage.h +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -136,6 +136,29 @@ invokeProtocolMessage (Buffers const& buffers, Handler& handler) return result; } +/** Write a protocol message to a streambuf. */ +template +void +write (Streambuf& streambuf, + ::google::protobuf::Message const& m, int type, + std::size_t blockBytes) +{ + auto const size = m.ByteSize(); + std::array v; + v[0] = static_cast ((size >> 24) & 0xFF); + v[1] = static_cast ((size >> 16) & 0xFF); + v[2] = static_cast ((size >> 8) & 0xFF); + v[3] = static_cast ( size & 0xFF); + v[4] = static_cast ((type >> 8) & 0xFF); + v[5] = static_cast ( type & 0xFF); + + streambuf.commit(boost::asio::buffer_copy( + streambuf.prepare(Message::kHeaderBytes), boost::asio::buffer(v))); + ZeroCopyOutputStream stream ( + streambuf, blockBytes); + m.SerializeToZeroCopyStream(&stream); +} + } // ripple #endif diff --git a/src/ripple/overlay/impl/TMHello.cpp b/src/ripple/overlay/impl/TMHello.cpp index 95345b3d45..46404cca31 100644 --- a/src/ripple/overlay/impl/TMHello.cpp +++ b/src/ripple/overlay/impl/TMHello.cpp @@ -207,6 +207,22 @@ parseHello (beast::http::message const& m, beast::Journal journal) // protocol version in TMHello is obsolete, // it is supplanted by the values in the headers. + { + // Required + auto const iter = h.find ("Upgrade"); + if (iter == h.end()) + return result; + auto const versions = parse_ProtocolVersions(iter->second); + if (versions.empty()) + return result; + hello.set_protoversion( + (static_cast(versions.back().first) << 16) | + (static_cast(versions.back().second))); + hello.set_protoversionmin( + (static_cast(versions.front().first) << 16) | + (static_cast(versions.front().second))); + } + { // Required auto const iter = h.find ("Public-Key"); diff --git a/src/ripple/overlay/impl/TMHello.h b/src/ripple/overlay/impl/TMHello.h index 2f523b3497..a71ea280be 100644 --- a/src/ripple/overlay/impl/TMHello.h +++ b/src/ripple/overlay/impl/TMHello.h @@ -21,6 +21,7 @@ #define RIPPLE_OVERLAY_TMHELLO_H_INCLUDED #include "ripple.pb.h" +#include #include #include #include diff --git a/src/ripple/overlay/tests/short_read.test.cpp b/src/ripple/overlay/tests/short_read.test.cpp index 3b2c6f3a6a..04d25dcb61 100644 --- a/src/ripple/overlay/tests/short_read.test.cpp +++ b/src/ripple/overlay/tests/short_read.test.cpp @@ -21,7 +21,6 @@ #include #include #include -#include #include #include #include diff --git a/src/ripple/unity/overlay.cpp b/src/ripple/unity/overlay.cpp index c069589ec1..c5edf04d2f 100644 --- a/src/ripple/unity/overlay.cpp +++ b/src/ripple/unity/overlay.cpp @@ -23,6 +23,7 @@ #include +#include #include #include #include