From 5f59282ba1f34c4a0cf1ab5be2ba58a264544674 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Thu, 28 Aug 2014 14:57:40 -0700 Subject: [PATCH] Clean up Overlay and PeerFinder sources: * Tidy up identifiers and declarations * Merge PeerFinder headers into one file * Merge handout classes and functions into one file --- Builds/VisualStudio2013/RippleD.vcxproj | 25 +- .../VisualStudio2013/RippleD.vcxproj.filters | 32 +- src/ripple/overlay/Peer.h | 5 +- src/ripple/overlay/impl/OverlayImpl.cpp | 2 +- src/ripple/overlay/impl/OverlayImpl.h | 1 - src/ripple/overlay/impl/PeerImp.cpp | 1432 +++++++++++++---- src/ripple/overlay/impl/PeerImp.h | 1065 +++--------- src/ripple/peerfinder/Callback.h | 54 - src/ripple/peerfinder/Config.h | 77 - src/ripple/peerfinder/Endpoint.h | 44 - src/ripple/peerfinder/Manager.h | 112 +- src/ripple/peerfinder/Types.h | 41 - src/ripple/peerfinder/impl/Bootcache.h | 2 +- src/ripple/peerfinder/impl/Config.cpp | 2 +- .../peerfinder/impl/ConnectHandouts.cpp | 64 - src/ripple/peerfinder/impl/ConnectHandouts.h | 79 - src/ripple/peerfinder/impl/Counts.h | 2 +- src/ripple/peerfinder/impl/Endpoint.cpp | 2 +- src/ripple/peerfinder/impl/Handouts.h | 352 ++++ src/ripple/peerfinder/impl/Livecache.h | 3 +- src/ripple/peerfinder/impl/Logic.h | 7 +- .../peerfinder/impl/RedirectHandouts.cpp | 70 - src/ripple/peerfinder/impl/RedirectHandouts.h | 62 - src/ripple/peerfinder/impl/SlotHandouts.cpp | 70 - src/ripple/peerfinder/impl/SlotHandouts.h | 65 - src/ripple/peerfinder/impl/handout.h | 88 - src/ripple/proto/ripple.proto | 7 + src/ripple/unity/peerfinder.cpp | 3 - 28 files changed, 1837 insertions(+), 1931 deletions(-) delete mode 100644 src/ripple/peerfinder/Callback.h delete mode 100644 src/ripple/peerfinder/Config.h delete mode 100644 src/ripple/peerfinder/Endpoint.h delete mode 100644 src/ripple/peerfinder/Types.h delete mode 100644 src/ripple/peerfinder/impl/ConnectHandouts.cpp delete mode 100644 src/ripple/peerfinder/impl/ConnectHandouts.h create mode 100644 src/ripple/peerfinder/impl/Handouts.h delete mode 100644 src/ripple/peerfinder/impl/RedirectHandouts.cpp delete mode 100644 src/ripple/peerfinder/impl/RedirectHandouts.h delete mode 100644 src/ripple/peerfinder/impl/SlotHandouts.cpp delete mode 100644 src/ripple/peerfinder/impl/SlotHandouts.h delete mode 100644 src/ripple/peerfinder/impl/handout.h diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index fe68ca92c4..a64f40de29 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -2832,12 +2832,6 @@ True - - - - - - True @@ -2853,11 +2847,6 @@ True - - True - - - @@ -2865,7 +2854,7 @@ - + @@ -2879,18 +2868,8 @@ True - - True - - - - - True - - - True @@ -2932,8 +2911,6 @@ - - Document protoc --cpp_out=..\..\build\proto --proto_path=%(RelativeDir) %(Identity) diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index 12980e2272..00138232b0 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -3978,15 +3978,6 @@ ripple\overlay\tests - - ripple\peerfinder - - - ripple\peerfinder - - - ripple\peerfinder - ripple\peerfinder\impl @@ -4005,12 +3996,6 @@ ripple\peerfinder\impl - - ripple\peerfinder\impl - - - ripple\peerfinder\impl - ripple\peerfinder\impl @@ -4020,7 +4005,7 @@ ripple\peerfinder\impl - + ripple\peerfinder\impl @@ -4038,21 +4023,9 @@ ripple\peerfinder\impl - - ripple\peerfinder\impl - - - ripple\peerfinder\impl - ripple\peerfinder\impl - - ripple\peerfinder\impl - - - ripple\peerfinder\impl - ripple\peerfinder\impl @@ -4110,9 +4083,6 @@ ripple\peerfinder - - ripple\peerfinder - ripple\proto diff --git a/src/ripple/overlay/Peer.h b/src/ripple/overlay/Peer.h index d995e3379c..7ec64c3d45 100644 --- a/src/ripple/overlay/Peer.h +++ b/src/ripple/overlay/Peer.h @@ -60,8 +60,11 @@ public: virtual ShortId getShortId () const = 0; virtual RippleAddress const& getNodePublic () const = 0; virtual Json::Value json () = 0; + // VFALCO TODO Replace both with + // boost::optional const& cluster_id(); + // virtual bool isInCluster () const = 0; - virtual std::string getClusterNodeName() const = 0; + virtual std::string const& getClusterNodeName() const = 0; // // Ledger diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index f079621024..d8bc765322 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -539,7 +539,7 @@ OverlayImpl::getActivePeers () ret.reserve (m_publicKeyMap.size ()); - BOOST_FOREACH (PeerByPublicKey::value_type const& pair, m_publicKeyMap) + for (auto const& pair : m_publicKeyMap) { assert (pair.second); ret.push_back (pair.second); diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 8e6673bc64..ce9e3b0227 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -25,7 +25,6 @@ #include #include #include -#include #include #include diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 049c4a0728..5a00cca4f9 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -24,20 +24,272 @@ namespace ripple { +PeerImp::PeerImp (NativeSocketType&& socket, beast::IP::Endpoint remoteAddress, + OverlayImpl& overlay, Resource::Manager& resourceManager, + PeerFinder::Manager& peerFinder, PeerFinder::Slot::ptr const& slot, + boost::asio::ssl::context& ssl_context, MultiSocket::Flag flags) + : m_owned_socket (std::move (socket)) + , journal_ (deprecatedLogs().journal("Peer")) + , remote_address_ (remoteAddress) + , resourceManager_ (resourceManager) + , peerFinder_ (peerFinder) + , overlay_ (overlay) + , m_inbound (true) + , socket_ (MultiSocket::New ( + m_owned_socket, ssl_context, flags.asBits ())) + , strand_ (m_owned_socket.get_io_service()) + , state_ (stateConnected) + , minLedger_ (0) + , maxLedger_ (0) + , timer_ (m_owned_socket.get_io_service()) + , slot_ (slot) + , message_stream_(*this) +{ +} + +PeerImp::PeerImp (beast::IP::Endpoint remoteAddress, + boost::asio::io_service& io_service, OverlayImpl& overlay, + Resource::Manager& resourceManager, PeerFinder::Manager& peerFinder, + PeerFinder::Slot::ptr const& slot, + boost::asio::ssl::context& ssl_context, MultiSocket::Flag flags) + : m_owned_socket (io_service) + , journal_ (deprecatedLogs().journal("Peer")) + , remote_address_ (remoteAddress) + , resourceManager_ (resourceManager) + , peerFinder_ (peerFinder) + , overlay_ (overlay) + , m_inbound (false) + , socket_ (MultiSocket::New ( + io_service, ssl_context, flags.asBits ())) + , strand_ (io_service) + , state_ (stateConnecting) + , minLedger_ (0) + , maxLedger_ (0) + , timer_ (io_service) + , slot_ (slot) + , message_stream_(*this) +{ +} + +PeerImp::~PeerImp () +{ + overlay_.remove (slot_); +} + +void +PeerImp::start () +{ + if (m_inbound) + do_accept (); + else + do_connect (); +} + +void +PeerImp::activate () +{ + assert (state_ == stateHandshaked); + state_ = stateActive; + assert(shortId_ == 0); + shortId_ = overlay_.next_id(); + overlay_.onPeerActivated(shared_from_this ()); +} + +void +PeerImp::close (bool graceful) +{ + was_canceled_ = true; + detach ("stop", graceful); +} + //------------------------------------------------------------------------------ -// TODO Make these class members or something. - -static void -sGetLedger (std::weak_ptr wPeer, - std::shared_ptr packet); +PeerImp::send (Message::pointer const& m) +{ + // VFALCO NOTE why call this with null? + if (! m) + return; + + if (! strand_.running_in_this_thread()) + { + strand_.post (std::bind (&PeerImp::send, shared_from_this(), m)); + return; + } + + if (send_packet_) + send_queue_.push_back (m); + else + sendForce (m); +} + +beast::IP::Endpoint +PeerImp::getRemoteAddress() const +{ + return remote_address_; +} -static void -peerTXData (Job&, std::weak_ptr wPeer, uint256 const& hash, - std::shared_ptr pPacket, - beast::Journal journal); +PeerImp::charge (Resource::Charge const& fee) +{ + if ((usage_.charge (fee) == Resource::drop) && usage_.disconnect ()) + detach ("resource"); +} + +//------------------------------------------------------------------------------ + +Peer::ShortId +PeerImp::getShortId () const +{ + return shortId_; +} + +RippleAddress const& +PeerImp::getNodePublic () const +{ + return publicKey_; +} + +Json::Value +PeerImp::json() +{ + Json::Value ret (Json::objectValue); + + ret["public_key"] = publicKey_.ToString (); + ret["address"] = remote_address_.to_string(); + + if (m_inbound) + ret["inbound"] = true; + + if (clusterNode_) + { + ret["cluster"] = true; + + if (!name_.empty ()) + ret["name"] = name_; + } + + if (hello_.has_fullversion ()) + ret["version"] = hello_.fullversion (); + + if (hello_.has_protoversion ()) + { + auto protocol = BuildInfo::make_protocol (hello_.protoversion ()); + + if (protocol != BuildInfo::getCurrentProtocol()) + ret["protocol"] = to_string (protocol); + } + + std::uint32_t minSeq, maxSeq; + ledgerRange(minSeq, maxSeq); + + if ((minSeq != 0) || (maxSeq != 0)) + ret["complete_ledgers"] = boost::lexical_cast(minSeq) + + " - " + boost::lexical_cast(maxSeq); + + if (closedLedgerHash_ != zero) + ret["ledger"] = to_string (closedLedgerHash_); + + if (last_status_.has_newstatus ()) + { + switch (last_status_.newstatus ()) + { + case protocol::nsCONNECTING: + ret["status"] = "connecting"; + break; + + case protocol::nsCONNECTED: + ret["status"] = "connected"; + break; + + case protocol::nsMONITORING: + ret["status"] = "monitoring"; + break; + + case protocol::nsVALIDATING: + ret["status"] = "validating"; + break; + + case protocol::nsSHUTTING: + ret["status"] = "shutting"; + break; + + default: + // FIXME: do we really want this? + journal_.warning << + "Unknown status: " << last_status_.newstatus (); + } + } + + return ret; +} + +bool +PeerImp::isInCluster () const +{ + return clusterNode_; +} + +std::string const& +PeerImp::getClusterNodeName() const +{ + return name_; +} + +//------------------------------------------------------------------------------ + +uint256 const& +PeerImp::getClosedLedgerHash () const +{ + return closedLedgerHash_; +} + +bool +PeerImp::hasLedger (uint256 const& hash, std::uint32_t seq) const +{ + std::lock_guard sl(recentLock_); + if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_)) + return true; + return std::find (recentLedgers_.begin(), + recentLedgers_.end(), hash) != recentLedgers_.end(); +} + +void +PeerImp::ledgerRange (std::uint32_t& minSeq, + std::uint32_t& maxSeq) const +{ + std::lock_guard sl(recentLock_); + + minSeq = minLedger_; + maxSeq = maxLedger_; +} + +bool +PeerImp::hasTxSet (uint256 const& hash) const +{ + std::lock_guard sl(recentLock_); + return std::find (recentTxSets_.begin(), + recentTxSets_.end(), hash) != recentTxSets_.end(); +} + +void +PeerImp::cycleStatus () +{ + previousLedgerHash_ = closedLedgerHash_; + closedLedgerHash_.zero (); +} + +bool +PeerImp::supportsVersion (int version) +{ + return hello_.has_protoversion () && (hello_.protoversion () >= version); +} + +bool +PeerImp::hasRange (std::uint32_t uMin, std::uint32_t uMax) +{ + return (uMin >= minLedger_) && (uMax <= maxLedger_); +} //------------------------------------------------------------------------------ @@ -52,11 +304,11 @@ peerTXData (Job&, std::weak_ptr wPeer, uint256 const& hash, void PeerImp::do_connect () { - m_journal.info << "Connecting to " << m_remoteAddress; + journal_.info << "Connecting to " << remote_address_; - m_usage = m_resourceManager.newOutboundEndpoint (m_remoteAddress); + usage_ = resourceManager_.newOutboundEndpoint (remote_address_); - if (m_usage.disconnect ()) + if (usage_.disconnect ()) { detach ("do_connect"); return; @@ -64,51 +316,51 @@ void PeerImp::do_connect () boost::system::error_code ec; timer_.expires_from_now (nodeVerifySeconds, ec); - timer_.async_wait (m_strand.wrap (std::bind (&PeerImp::handleVerifyTimer, + timer_.async_wait (strand_.wrap (std::bind (&PeerImp::handleVerifyTimer, shared_from_this (), beast::asio::placeholders::error))); if (ec) { - m_journal.error << "Failed to set verify timer."; + journal_.error << "Failed to set verify timer."; detach ("do_connect"); return; } - m_socket->next_layer ().async_connect ( - beast::IPAddressConversion::to_asio_endpoint (m_remoteAddress), - m_strand.wrap (std::bind (&PeerImp::on_connect, + socket_->next_layer ().async_connect ( + beast::IPAddressConversion::to_asio_endpoint (remote_address_), + strand_.wrap (std::bind (&PeerImp::on_connect, shared_from_this (), beast::asio::placeholders::error))); } void PeerImp::on_connect (error_code ec) { - if (m_detaching || ec == boost::asio::error::operation_aborted) + if (detaching_ || ec == boost::asio::error::operation_aborted) return; NativeSocketType::endpoint_type local_endpoint; if (! ec) - local_endpoint = m_socket->this_layer < + local_endpoint = socket_->this_layer < NativeSocketType> ().local_endpoint (ec); if (ec) { - m_journal.error << - "Connect to " << m_remoteAddress << + journal_.error << + "Connect to " << remote_address_ << " failed: " << ec.message(); detach ("hc"); return; } - assert (m_state == stateConnecting); - m_state = stateConnected; + assert (state_ == stateConnecting); + state_ = stateConnected; - m_peerFinder.on_connected (m_slot, + peerFinder_.on_connected (slot_, beast::IPAddressConversion::from_asio (local_endpoint)); - m_socket->set_verify_mode (boost::asio::ssl::verify_none); - m_socket->async_handshake ( + socket_->set_verify_mode (boost::asio::ssl::verify_none); + socket_->async_handshake ( boost::asio::ssl::stream_base::client, - m_strand.wrap (std::bind (&PeerImp::on_connect_ssl, + strand_.wrap (std::bind (&PeerImp::on_connect_ssl, std::static_pointer_cast (shared_from_this ()), beast::asio::placeholders::error))); } @@ -122,8 +374,8 @@ PeerImp::make_request() m.url ("/"); m.version (1, 1); m.headers.append ("User-Agent", BuildInfo::getFullVersionString()); - //m.headers.append ("Local-Address", m_socket-> - m.headers.append ("Remote-Address", m_remoteAddress.to_string()); + //m.headers.append ("Local-Address", socket_-> + m.headers.append ("Remote-Address", remote_address_.to_string()); m.headers.append ("Upgrade", std::string("Ripple/") + to_string (BuildInfo::getCurrentProtocol())); m.headers.append ("Connection", "Upgrade"); @@ -139,12 +391,12 @@ PeerImp::make_request() void PeerImp::on_connect_ssl (error_code ec) { - if (m_detaching || ec == boost::asio::error::operation_aborted) + if (detaching_ || ec == boost::asio::error::operation_aborted) return; if (ec) { - m_journal.info << + journal_.info << "on_connect_ssl: " << ec.message(); detach("on_connect_ssl"); return; @@ -165,12 +417,12 @@ PeerImp::on_connect_ssl (error_code ec) void PeerImp::on_write_http_request (error_code ec, std::size_t bytes_transferred) { - if (m_detaching || ec == boost::asio::error::operation_aborted) + if (detaching_ || ec == boost::asio::error::operation_aborted) return; if (ec) { - m_journal.info << + journal_.info << "on_write_http_request: " << ec.message(); detach("on_write_http_request"); return; @@ -187,8 +439,8 @@ PeerImp::on_write_http_request (error_code ec, std::size_t bytes_transferred) return; } - m_socket->async_write_some (write_buffer_.data(), - m_strand.wrap (std::bind (&PeerImp::on_write_http_request, + socket_->async_write_some (write_buffer_.data(), + strand_.wrap (std::bind (&PeerImp::on_write_http_request, shared_from_this(), beast::asio::placeholders::error, beast::asio::placeholders::bytes_transferred))); } @@ -197,7 +449,7 @@ PeerImp::on_write_http_request (error_code ec, std::size_t bytes_transferred) void PeerImp::on_read_http_response (error_code ec, std::size_t bytes_transferred) { - if (m_detaching || ec == boost::asio::error::operation_aborted) + if (detaching_ || ec == boost::asio::error::operation_aborted) return; if (! ec) @@ -205,7 +457,8 @@ PeerImp::on_read_http_response (error_code ec, std::size_t bytes_transferred) read_buffer_.commit (bytes_transferred); bool success; std::size_t bytes_consumed; - std::tie (success, bytes_consumed) = http_parser_->write (read_buffer_.data()); + std::tie (success, bytes_consumed) = http_parser_->write ( + read_buffer_.data()); if (! success) ec = http_parser_->error(); @@ -222,7 +475,7 @@ PeerImp::on_read_http_response (error_code ec, std::size_t bytes_transferred) // if (http_message_->status() != 200) { - m_journal.info << + journal_.info << "HTTP Response: " << http_message_->reason() << "(" << http_message_->status() << ")"; detach("on_read_http_response"); @@ -242,14 +495,14 @@ PeerImp::on_read_http_response (error_code ec, std::size_t bytes_transferred) if (ec) { - m_journal.info << + journal_.info << "on_read_response: " << ec.message(); detach("on_read_response"); return; } - m_socket->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), - m_strand.wrap (std::bind (&PeerImp::on_read_http_response, + socket_->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), + strand_.wrap (std::bind (&PeerImp::on_read_http_response, shared_from_this(), beast::asio::placeholders::error, beast::asio::placeholders::bytes_transferred))); } @@ -265,18 +518,18 @@ PeerImp::on_read_http_response (error_code ec, std::size_t bytes_transferred) */ void PeerImp::do_accept () { - m_journal.info << "Accepted " << m_remoteAddress; + journal_.info << "Accepted " << remote_address_; - m_usage = m_resourceManager.newInboundEndpoint (m_remoteAddress); - if (m_usage.disconnect ()) + usage_ = resourceManager_.newInboundEndpoint (remote_address_); + if (usage_.disconnect ()) { detach ("do_accept"); return; } - m_socket->set_verify_mode (boost::asio::ssl::verify_none); - m_socket->async_handshake (boost::asio::ssl::stream_base::server, - m_strand.wrap (std::bind (&PeerImp::on_accept_ssl, + socket_->set_verify_mode (boost::asio::ssl::verify_none); + socket_->async_handshake (boost::asio::ssl::stream_base::server, + strand_.wrap (std::bind (&PeerImp::on_accept_ssl, std::static_pointer_cast (shared_from_this ()), beast::asio::placeholders::error))); } @@ -284,12 +537,12 @@ void PeerImp::do_accept () void PeerImp::on_accept_ssl (error_code ec) { - if (m_detaching || ec == boost::asio::error::operation_aborted) + if (detaching_ || ec == boost::asio::error::operation_aborted) return; if (ec) { - m_journal.info << + journal_.info << "on_accept_ssl: " << ec.message(); detach("on_accept_ssl"); return; @@ -308,12 +561,12 @@ PeerImp::on_accept_ssl (error_code ec) void PeerImp::on_read_http_detect (error_code ec, std::size_t bytes_transferred) { - if (m_detaching || ec == boost::asio::error::operation_aborted) + if (detaching_ || ec == boost::asio::error::operation_aborted) return; if (ec) { - m_journal.info << + journal_.info << "on_read_detect: " << ec.message(); detach("on_read_detect"); return; @@ -336,8 +589,8 @@ PeerImp::on_read_http_detect (error_code ec, std::size_t bytes_transferred) return; } - m_socket->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), - m_strand.wrap (std::bind (&PeerImp::on_read_http_detect, + socket_->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), + strand_.wrap (std::bind (&PeerImp::on_read_http_detect, shared_from_this(), beast::asio::placeholders::error, beast::asio::placeholders::bytes_transferred))); } @@ -346,7 +599,7 @@ PeerImp::on_read_http_detect (error_code ec, std::size_t bytes_transferred) void PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred) { - if (m_detaching || ec == boost::asio::error::operation_aborted) + if (detaching_ || ec == boost::asio::error::operation_aborted) return; if (! ec) @@ -354,7 +607,8 @@ PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred) read_buffer_.commit (bytes_transferred); bool success; std::size_t bytes_consumed; - std::tie (success, bytes_consumed) = http_parser_->write (read_buffer_.data()); + std::tie (success, bytes_consumed) = http_parser_->write ( + read_buffer_.data()); if (! success) ec = http_parser_->error(); @@ -399,14 +653,14 @@ PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred) if (ec) { - m_journal.info << + journal_.info << "on_read_http_request: " << ec.message(); detach("on_read_http_request"); return; } - m_socket->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), - m_strand.wrap (std::bind (&PeerImp::on_read_http_request, + socket_->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), + strand_.wrap (std::bind (&PeerImp::on_read_http_request, shared_from_this(), beast::asio::placeholders::error, beast::asio::placeholders::bytes_transferred))); } @@ -423,12 +677,12 @@ PeerImp::make_response (beast::http::message const& req) void PeerImp::on_write_http_response (error_code ec, std::size_t bytes_transferred) { - if (m_detaching || ec == boost::asio::error::operation_aborted) + if (detaching_ || ec == boost::asio::error::operation_aborted) return; if (ec) { - m_journal.info << + journal_.info << "on_write_http_response: " << ec.message(); detach("on_write_http_response"); return; @@ -442,8 +696,8 @@ PeerImp::on_write_http_response (error_code ec, std::size_t bytes_transferred) return; } - m_socket->async_write_some (write_buffer_.data(), - m_strand.wrap (std::bind (&PeerImp::on_write_http_response, + socket_->async_write_some (write_buffer_.data(), + strand_.wrap (std::bind (&PeerImp::on_write_http_response, shared_from_this(), beast::asio::placeholders::error, beast::asio::placeholders::bytes_transferred))); } @@ -462,7 +716,7 @@ PeerImp::do_protocol_start () { if (!sendHello ()) { - m_journal.error << "Unable to send HELLO to " << m_remoteAddress; + journal_.error << "Unable to send HELLO to " << remote_address_; detach ("hello"); return; } @@ -474,7 +728,7 @@ PeerImp::do_protocol_start () void PeerImp::on_read_protocol (error_code ec, std::size_t bytes_transferred) { - if (m_detaching || ec == boost::asio::error::operation_aborted) + if (detaching_ || ec == boost::asio::error::operation_aborted) return; if (! ec) @@ -486,14 +740,14 @@ PeerImp::on_read_protocol (error_code ec, std::size_t bytes_transferred) if (ec) { - m_journal.info << + journal_.info << "on_read_protocol: " << ec.message(); detach("on_read_protocol"); return; } - m_socket->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), - m_strand.wrap (std::bind (&PeerImp::on_read_protocol, + socket_->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), + strand_.wrap (std::bind (&PeerImp::on_read_protocol, shared_from_this(), beast::asio::placeholders::error, beast::asio::placeholders::bytes_transferred))); } @@ -504,10 +758,84 @@ PeerImp::on_write_protocol (error_code ec, std::size_t bytes_transferred) { // (this function isn't called yet) - if (m_detaching || ec == boost::asio::error::operation_aborted) + if (detaching_ || ec == boost::asio::error::operation_aborted) return; } +void +PeerImp::handleShutdown (boost::system::error_code const& ec) +{ + if (detaching_) + return; + + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec) + { + journal_.info << "Shutdown: " << ec.message (); + detach ("hsd"); + return; + } +} + +void +PeerImp::handleWrite (boost::system::error_code const& ec, size_t bytes) +{ + if (detaching_) + return; + + // Call on IO strand + + send_packet_.reset (); + + if (ec == boost::asio::error::operation_aborted) + return; + + if (detaching_) + return; + + if (ec) + { + journal_.info << "Write: " << ec.message (); + detach ("hw"); + return; + } + + if (!send_queue_.empty ()) + { + Message::pointer packet = send_queue_.front (); + + if (packet) + { + sendForce (packet); + send_queue_.pop_front (); + } + } +} + +void +PeerImp::handleVerifyTimer (boost::system::error_code const& ec) +{ + if (detaching_) + return; + + if (ec == boost::asio::error::operation_aborted) + { + // Timer canceled because deadline no longer needed. + } + else if (ec) + { + journal_.info << "Peer verify timer error"; + } + else + { + // journal_.info << "Verify: Peer failed to verify in time."; + + detach ("hvt"); + } +} + //------------------------------------------------------------------------------ // // abstract_protocol_handler @@ -533,15 +861,15 @@ PeerImp::on_message_begin (std::uint16_t type, log << m->DebugString(); #endif - if (type == protocol::mtHELLO && m_state != stateConnected) + if (type == protocol::mtHELLO && state_ != stateConnected) { - m_journal.warning << + journal_.warning << "Unexpected TMHello"; ec = invalid_argument_error(); } - else if (type != protocol::mtHELLO && m_state == stateConnected) + else if (type != protocol::mtHELLO && state_ == stateConnected) { - m_journal.warning << + journal_.warning << "Expected TMHello"; ec = invalid_argument_error(); } @@ -580,7 +908,7 @@ PeerImp::on_message (std::shared_ptr const& m) { std::int64_t to = ourTime; to -= m->nettime (); - m_journal.debug << + journal_.debug << "Connect: time offset " << to; } #endif @@ -592,83 +920,83 @@ PeerImp::on_message (std::shared_ptr const& m) { if (m->nettime () > maxTime) { - m_journal.info << + journal_.info << "Hello: Clock for " << *this << " is off by +" << m->nettime () - ourTime; } else if (m->nettime () < minTime) { - m_journal.info << + journal_.info << "Hello: Clock for " << *this << " is off by -" << ourTime - m->nettime (); } } else if (m->protoversionmin () > to_packed (BuildInfo::getCurrentProtocol())) { - m_journal.info << + journal_.info << "Hello: Disconnect: Protocol mismatch [" << "Peer expects " << to_string (protocol) << " and we run " << to_string (BuildInfo::getCurrentProtocol()) << "]"; } - else if (! m_nodePublicKey.setNodePublic (m->nodepublic ())) + else if (! publicKey_.setNodePublic (m->nodepublic ())) { - m_journal.info << + journal_.info << "Hello: Disconnect: Bad node public key."; } - else if (! m_nodePublicKey.verifyNodePublic ( - m_secureCookie, m->nodeproof (), ECDSA::not_strict)) + else if (! publicKey_.verifyNodePublic ( + secureCookie_, m->nodeproof (), ECDSA::not_strict)) { // Unable to verify they have private key for claimed public key. - m_journal.info << + journal_.info << "Hello: Disconnect: Failed to verify session."; } else { // Successful connection. - m_journal.info << - "Hello: Connect: " << m_nodePublicKey.humanNodePublic (); + journal_.info << + "Hello: Connect: " << publicKey_.humanNodePublic (); if ((protocol != BuildInfo::getCurrentProtocol()) && - m_journal.active(beast::Journal::Severity::kInfo)) + journal_.active(beast::Journal::Severity::kInfo)) { - m_journal.info << + journal_.info << "Peer protocol: " << to_string (protocol); } - mHello = *m; + hello_ = *m; // Determine if this peer belongs to our cluster and get it's name - m_clusterNode = getApp().getUNL().nodeInCluster ( - m_nodePublicKey, m_nodeName); + clusterNode_ = getApp().getUNL().nodeInCluster ( + publicKey_, name_); - if (m_clusterNode) - m_journal.info << - "Connected to cluster node " << m_nodeName; + if (clusterNode_) + journal_.info << + "Connected to cluster node " << name_; - assert (m_state == stateConnected); - m_state = stateHandshaked; + assert (state_ == stateConnected); + state_ = stateHandshaked; - m_peerFinder.on_handshake (m_slot, RipplePublicKey (m_nodePublicKey), - m_clusterNode); + peerFinder_.on_handshake (slot_, RipplePublicKey (publicKey_), + clusterNode_); // XXX Set timer: connection is in grace period to be useful. // XXX Set timer: connection idle (idle may vary depending on connection type.) - if ((mHello.has_ledgerclosed ()) && ( - mHello.ledgerclosed ().size () == (256 / 8))) + if ((hello_.has_ledgerclosed ()) && ( + hello_.ledgerclosed ().size () == (256 / 8))) { - memcpy (m_closedLedgerHash.begin (), - mHello.ledgerclosed ().data (), 256 / 8); + memcpy (closedLedgerHash_.begin (), + hello_.ledgerclosed ().data (), 256 / 8); - if ((mHello.has_ledgerprevious ()) && - (mHello.ledgerprevious ().size () == (256 / 8))) + if ((hello_.has_ledgerprevious ()) && + (hello_.ledgerprevious ().size () == (256 / 8))) { - memcpy (m_previousLedgerHash.begin (), - mHello.ledgerprevious ().data (), 256 / 8); - addLedger (m_previousLedgerHash); + memcpy (previousLedgerHash_.begin (), + hello_.ledgerprevious ().data (), 256 / 8); + addLedger (previousLedgerHash_); } else { - m_previousLedgerHash.zero (); + previousLedgerHash_.zero (); } } @@ -677,7 +1005,7 @@ PeerImp::on_message (std::shared_ptr const& m) if (bDetach) { - //m_nodePublicKey.clear (); + //publicKey_.clear (); //detach ("recvh"); ec = invalid_argument_error(); @@ -719,7 +1047,8 @@ PeerImp::on_message (std::shared_ptr const& m) memcpy (response.begin (), m->response ().data (), 256 / 8); // VFALCO TODO Use a dependency injection here - PowResult r = getApp().getProofOfWorkFactory ().checkProof (m->token (), response); + PowResult r = getApp().getProofOfWorkFactory ().checkProof ( + m->token (), response); if (r == powOK) { @@ -751,7 +1080,8 @@ PeerImp::on_message (std::shared_ptr const& m) uint256 challenge, target; - if ((m->challenge ().size () != (256 / 8)) || (m->target ().size () != (256 / 8))) + if ((m->challenge ().size () != (256 / 8)) || ( + m->target ().size () != (256 / 8))) { charge (Resource::feeInvalidRequest); return ec; @@ -759,8 +1089,8 @@ PeerImp::on_message (std::shared_ptr const& m) memcpy (challenge.begin (), m->challenge ().data (), 256 / 8); memcpy (target.begin (), m->target ().data (), 256 / 8); - ProofOfWork::pointer pow = std::make_shared (m->token (), m->iterations (), - challenge, target); + ProofOfWork::pointer pow = std::make_shared ( + m->token (), m->iterations (), challenge, target); if (!pow->isValid ()) { @@ -779,7 +1109,7 @@ PeerImp::on_message (std::shared_ptr const& m) return ec; } - m_journal.info << "Received in valid proof of work object from peer"; + journal_.info << "Received in valid proof of work object from peer"; return ec; } @@ -788,7 +1118,7 @@ PeerImp::error_code PeerImp::on_message (std::shared_ptr const& m) { error_code ec; - if (!m_clusterNode) + if (!clusterNode_) { charge (Resource::feeUnwantedData); return ec; @@ -823,7 +1153,7 @@ PeerImp::on_message (std::shared_ptr const& m) if (item.address != beast::IP::Endpoint()) gossip.items.push_back(item); } - m_resourceManager.importConsumers (m_nodeName, gossip); + resourceManager_.importConsumers (name_, gossip); } getApp().getFeeTrack().setClusterFee(getApp().getUNL().getClusterFee()); @@ -859,7 +1189,7 @@ PeerImp::on_message (std::shared_ptr const& m) } if (! list.empty()) - m_peerFinder.on_legacy_endpoints (list); + peerFinder_.on_legacy_endpoints (list); return ec; } @@ -895,7 +1225,7 @@ PeerImp::on_message (std::shared_ptr const& m) // then we'll verify that their listener can receive incoming // by performing a connectivity test. // - endpoint.address = m_remoteAddress.at_port ( + endpoint.address = remote_address_.at_port ( tm.ipv4().ipv4port ()); } @@ -903,7 +1233,7 @@ PeerImp::on_message (std::shared_ptr const& m) } if (! endpoints.empty()) - m_peerFinder.on_endpoints (m_slot, endpoints); + peerFinder_.on_endpoints (slot_, endpoints); return ec; } @@ -916,12 +1246,14 @@ PeerImp::on_message (std::shared_ptr const& m) try { SerializerIterator sit (s); - SerializedTransaction::pointer stx = std::make_shared (std::ref (sit)); + SerializedTransaction::pointer stx = std::make_shared < + SerializedTransaction> (std::ref (sit)); uint256 txID = stx->getTransactionID(); int flags; - if (! getApp().getHashRouter ().addSuppressionPeer (txID, m_shortId, flags)) + if (! getApp().getHashRouter ().addSuppressionPeer ( + txID, shortId_, flags)) { // we have seen this transaction recently if (flags & SF_BAD) @@ -934,9 +1266,10 @@ PeerImp::on_message (std::shared_ptr const& m) return ec; } - m_journal.debug << "Got transaction from peer " << *this << ": " << txID; + journal_.debug << + "Got transaction from peer " << *this << ": " << txID; - if (m_clusterNode) + if (clusterNode_) { flags |= SF_TRUSTED; if (! getConfig().VALIDATION_PRIV.isSet()) @@ -948,9 +1281,9 @@ PeerImp::on_message (std::shared_ptr const& m) } if (getApp().getJobQueue().getJobCount(jtTRANSACTION) > 100) - m_journal.info << "Transaction queue is full"; + journal_.info << "Transaction queue is full"; else if (getApp().getLedgerMaster().getValidatedLedgerAge() > 240) - m_journal.trace << "No new transactions until synchronized"; + journal_.trace << "No new transactions until synchronized"; else getApp().getJobQueue ().addJob (jtTRANSACTION, "recvTransaction->checkTransaction", @@ -962,7 +1295,7 @@ PeerImp::on_message (std::shared_ptr const& m) } catch (...) { - m_journal.warning << "Transaction invalid: " << + journal_.warning << "Transaction invalid: " << s.getHex(); } return ec; @@ -973,7 +1306,8 @@ PeerImp::on_message (std::shared_ptr const& m) { error_code ec; getApp().getJobQueue().addJob (jtPACK, "recvGetLedger", - std::bind (&sGetLedger, std::weak_ptr (shared_from_this ()), m)); + std::bind (&PeerImp::sGetLedger, std::weak_ptr ( + shared_from_this ()), m)); return ec; } @@ -985,22 +1319,23 @@ PeerImp::on_message (std::shared_ptr const& m) if (m->nodes ().size () <= 0) { - m_journal.warning << "Ledger/TXset data with no nodes"; + journal_.warning << "Ledger/TXset data with no nodes"; return ec; } if (m->has_requestcookie ()) { - Peer::ptr target = m_overlay.findPeerByShortID (m->requestcookie ()); + Peer::ptr target = overlay_.findPeerByShortID (m->requestcookie ()); if (target) { m->clear_requestcookie (); - target->send (std::make_shared (packet, protocol::mtLEDGER_DATA)); + target->send (std::make_shared ( + packet, protocol::mtLEDGER_DATA)); } else { - m_journal.info << "Unable to route TX/ledger data reply"; + journal_.info << "Unable to route TX/ledger data reply"; charge (Resource::feeUnwantedData); } @@ -1011,7 +1346,7 @@ PeerImp::on_message (std::shared_ptr const& m) if (m->ledgerhash ().size () != 32) { - m_journal.warning << "TX candidate reply with invalid hash size"; + journal_.warning << "TX candidate reply with invalid hash size"; charge (Resource::feeInvalidRequest); return ec; } @@ -1023,16 +1358,17 @@ PeerImp::on_message (std::shared_ptr const& m) // got data for a candidate transaction set getApp().getJobQueue().addJob (jtTXN_DATA, "recvPeerData", - std::bind (&peerTXData, std::placeholders::_1, + std::bind (&PeerImp::peerTXData, std::placeholders::_1, std::weak_ptr (shared_from_this ()), - hash, m, m_journal)); + hash, m, journal_)); return ec; } - if (!getApp().getInboundLedgers ().gotLedgerData (hash, shared_from_this(), m)) + if (!getApp().getInboundLedgers ().gotLedgerData ( + hash, shared_from_this(), m)) { - m_journal.trace << "Got data for unwanted ledger"; + journal_.trace << "Got data for unwanted ledger"; charge (Resource::feeUnwantedData); } return ec; @@ -1050,17 +1386,22 @@ PeerImp::on_message (std::shared_ptr const& m) // VFALCO Magic numbers are bad // Roll this into a validation function - if ((set.currenttxhash ().size () != 32) || (set.nodepubkey ().size () < 28) || - (set.signature ().size () < 56) || (set.nodepubkey ().size () > 128) || (set.signature ().size () > 128)) + if ( + (set.currenttxhash ().size () != 32) || + (set.nodepubkey ().size () < 28) || + (set.signature ().size () < 56) || + (set.nodepubkey ().size () > 128) || + (set.signature ().size () > 128) + ) { - m_journal.warning << "Received proposal is malformed"; + journal_.warning << "Received proposal is malformed"; charge (Resource::feeInvalidSignature); return ec; } if (set.has_previousledger () && (set.previousledger ().size () != 32)) { - m_journal.warning << "Received proposal is malformed"; + journal_.warning << "Received proposal is malformed"; charge (Resource::feeInvalidRequest); return ec; } @@ -1071,34 +1412,39 @@ PeerImp::on_message (std::shared_ptr const& m) if (set.has_previousledger ()) memcpy (prevLedger.begin (), set.previousledger ().data (), 32); - uint256 suppression = LedgerProposal::computeSuppressionID (proposeHash, prevLedger, - set.proposeseq(), set.closetime (), - Blob(set.nodepubkey ().begin (), set.nodepubkey ().end ()), - Blob(set.signature ().begin (), set.signature ().end ())); + uint256 suppression = LedgerProposal::computeSuppressionID ( + proposeHash, prevLedger, set.proposeseq(), set.closetime (), + Blob(set.nodepubkey ().begin (), set.nodepubkey ().end ()), + Blob(set.signature ().begin (), set.signature ().end ())); - if (! getApp().getHashRouter ().addSuppressionPeer (suppression, m_shortId)) + if (! getApp().getHashRouter ().addSuppressionPeer ( + suppression, shortId_)) { - m_journal.trace << "Received duplicate proposal from peer " << m_shortId; + journal_.trace << + "Received duplicate proposal from peer " << shortId_; return ec; } - RippleAddress signerPublic = RippleAddress::createNodePublic (strCopy (set.nodepubkey ())); + RippleAddress signerPublic = RippleAddress::createNodePublic ( + strCopy (set.nodepubkey ())); if (signerPublic == getConfig ().VALIDATION_PUB) { - m_journal.trace << "Received our own proposal from peer " << m_shortId; + journal_.trace << + "Received our own proposal from peer " << shortId_; return ec; } bool isTrusted = getApp().getUNL ().nodeInUNL (signerPublic); if (!isTrusted && getApp().getFeeTrack ().isLoadedLocal ()) { - m_journal.debug << "Dropping UNTRUSTED proposal due to load"; + journal_.debug << "Dropping UNTRUSTED proposal due to load"; return ec; } - m_journal.trace << "Received " << (isTrusted ? "trusted" : "UNTRUSTED") << - " proposal from " << m_shortId; + journal_.trace << + "Received " << (isTrusted ? "trusted" : "UNTRUSTED") << + " proposal from " << shortId_; uint256 consensusLCL; @@ -1109,13 +1455,15 @@ PeerImp::on_message (std::shared_ptr const& m) LedgerProposal::pointer proposal = std::make_shared ( prevLedger.isNonZero () ? prevLedger : consensusLCL, - set.proposeseq (), proposeHash, set.closetime (), signerPublic, suppression); + set.proposeseq (), proposeHash, set.closetime (), + signerPublic, suppression); getApp().getJobQueue ().addJob (isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, "recvPropose->checkPropose", std::bind ( - &PeerImp::checkPropose, std::placeholders::_1, &m_overlay, - m, proposal, consensusLCL, m_nodePublicKey, - std::weak_ptr (shared_from_this ()), m_clusterNode)); + &PeerImp::checkPropose, std::placeholders::_1, &overlay_, + m, proposal, consensusLCL, publicKey_, + std::weak_ptr (shared_from_this ()), clusterNode_, + journal_)); return ec; } @@ -1123,65 +1471,67 @@ PeerImp::error_code PeerImp::on_message (std::shared_ptr const& m) { error_code ec; - m_journal.trace << "Received status change from peer " << + journal_.trace << "Received status change from peer " << to_string (this); if (!m->has_networktime ()) m->set_networktime (getApp().getOPs ().getNetworkTimeNC ()); - if (!mLastStatus.has_newstatus () || m->has_newstatus ()) - mLastStatus = *m; + if (!last_status_.has_newstatus () || m->has_newstatus ()) + last_status_ = *m; else { // preserve old status - protocol::NodeStatus status = mLastStatus.newstatus (); - mLastStatus = *m; + protocol::NodeStatus status = last_status_.newstatus (); + last_status_ = *m; m->set_newstatus (status); } if (m->newevent () == protocol::neLOST_SYNC) { - if (!m_closedLedgerHash.isZero ()) + if (!closedLedgerHash_.isZero ()) { - m_journal.trace << "peer has lost sync " << to_string (this); - m_closedLedgerHash.zero (); + journal_.trace << "peer has lost sync " << to_string (this); + closedLedgerHash_.zero (); } - m_previousLedgerHash.zero (); + previousLedgerHash_.zero (); return ec; } if (m->has_ledgerhash () && (m->ledgerhash ().size () == (256 / 8))) { // a peer has changed ledgers - memcpy (m_closedLedgerHash.begin (), m->ledgerhash ().data (), 256 / 8); - addLedger (m_closedLedgerHash); - m_journal.trace << "peer LCL is " << m_closedLedgerHash << + memcpy (closedLedgerHash_.begin (), m->ledgerhash ().data (), 256 / 8); + addLedger (closedLedgerHash_); + journal_.trace << "peer LCL is " << closedLedgerHash_ << " " << to_string (this); } else { - m_journal.trace << "peer has no ledger hash" << to_string (this); - m_closedLedgerHash.zero (); + journal_.trace << "peer has no ledger hash" << to_string (this); + closedLedgerHash_.zero (); } - if (m->has_ledgerhashprevious () && m->ledgerhashprevious ().size () == (256 / 8)) + if (m->has_ledgerhashprevious () && + m->ledgerhashprevious ().size () == (256 / 8)) { - memcpy (m_previousLedgerHash.begin (), m->ledgerhashprevious ().data (), 256 / 8); - addLedger (m_previousLedgerHash); + memcpy (previousLedgerHash_.begin (), + m->ledgerhashprevious ().data (), 256 / 8); + addLedger (previousLedgerHash_); } - else m_previousLedgerHash.zero (); + else previousLedgerHash_.zero (); if (m->has_firstseq () && m->has_lastseq()) { - m_minLedger = m->firstseq (); - m_maxLedger = m->lastseq (); + minLedger_ = m->firstseq (); + maxLedger_ = m->lastseq (); // Work around some servers that report sequences incorrectly - if (m_minLedger == 0) - m_maxLedger = 0; - if (m_maxLedger == 0) - m_minLedger = 0; + if (minLedger_ == 0) + maxLedger_ = 0; + if (maxLedger_ == 0) + minLedger_ = 0; } return ec; } @@ -1211,7 +1561,8 @@ PeerImp::on_message (std::shared_ptr const& m) { Application::ScopedLockType lock (getApp ().getMasterLock ()); - if (!getApp().getOPs ().hasTXSet (shared_from_this (), hash, m->status ())) + if (!getApp().getOPs ().hasTXSet ( + shared_from_this (), hash, m->status ())) charge (Resource::feeUnwantedData); } return ec; @@ -1225,7 +1576,7 @@ PeerImp::on_message (std::shared_ptr const& m) if (m->validation ().size () < 50) { - m_journal.warning << "Too small validation from peer"; + journal_.warning << "Too small validation from peer"; charge (Resource::feeInvalidRequest); return ec; } @@ -1234,18 +1585,20 @@ PeerImp::on_message (std::shared_ptr const& m) { Serializer s (m->validation ()); SerializerIterator sit (s); - SerializedValidation::pointer val = std::make_shared (std::ref (sit), false); + SerializedValidation::pointer val = std::make_shared < + SerializedValidation> (std::ref (sit), false); if (closeTime > (120 + val->getFieldU32(sfSigningTime))) { - m_journal.trace << "Validation is more than two minutes old"; + journal_.trace << "Validation is more than two minutes old"; charge (Resource::feeUnwantedData); return ec; } - if (! getApp().getHashRouter ().addSuppressionPeer (s.getSHA512Half(), m_shortId)) + if (! getApp().getHashRouter ().addSuppressionPeer ( + s.getSHA512Half(), shortId_)) { - m_journal.trace << "Validation is duplicate"; + journal_.trace << "Validation is duplicate"; return ec; } @@ -1255,20 +1608,20 @@ PeerImp::on_message (std::shared_ptr const& m) getApp().getJobQueue ().addJob ( isTrusted ? jtVALIDATION_t : jtVALIDATION_ut, "recvValidation->checkValidation", - std::bind ( - &PeerImp::checkValidation, std::placeholders::_1, - &m_overlay, val, isTrusted, m_clusterNode, m, - std::weak_ptr (shared_from_this ()))); + std::bind (&PeerImp::checkValidation, std::placeholders::_1, + &overlay_, val, isTrusted, clusterNode_, m, + std::weak_ptr (shared_from_this ()), + journal_)); } else { - m_journal.debug << + journal_.debug << "Dropping UNTRUSTED validation due to load"; } } catch (...) { - m_journal.warning << + journal_.warning << "Exception processing validation"; charge (Resource::feeInvalidRequest); } @@ -1314,13 +1667,15 @@ PeerImp::on_message (std::shared_ptr const& m) memcpy (hash.begin (), obj.hash ().data (), 256 / 8); // VFALCO TODO Move this someplace more sensible so we dont // need to inject the NodeStore interfaces. - NodeObject::pointer hObj = getApp().getNodeStore ().fetch (hash); + NodeObject::pointer hObj = + getApp().getNodeStore ().fetch (hash); if (hObj) { protocol::TMIndexedObject& newObj = *reply.add_objects (); newObj.set_hash (hash.begin (), hash.size ()); - newObj.set_data (&hObj->getData ().front (), hObj->getData ().size ()); + newObj.set_data (&hObj->getData ().front (), + hObj->getData ().size ()); if (obj.has_nodeid ()) newObj.set_index (obj.nodeid ()); @@ -1331,7 +1686,7 @@ PeerImp::on_message (std::shared_ptr const& m) } } - m_journal.trace << "GetObjByHash had " << reply.objects_size () << + journal_.trace << "GetObjByHash had " << reply.objects_size () << " of " << packet.objects_size () << " for " << to_string (this); send (std::make_shared (reply, protocol::mtGET_OBJECTS)); @@ -1354,14 +1709,16 @@ PeerImp::on_message (std::shared_ptr const& m) if (obj.ledgerseq () != pLSeq) { if ((pLDo && (pLSeq != 0)) && - m_journal.active(beast::Journal::Severity::kDebug)) - m_journal.debug << "Received full fetch pack for " << pLSeq; + journal_.active(beast::Journal::Severity::kDebug)) + journal_.debug << + "Received full fetch pack for " << pLSeq; pLSeq = obj.ledgerseq (); pLDo = !getApp().getOPs ().haveLedger (pLSeq); if (!pLDo) - m_journal.debug << "Got pack for " << pLSeq << " too late"; + journal_.debug << + "Got pack for " << pLSeq << " too late"; else progress = true; } @@ -1382,8 +1739,8 @@ PeerImp::on_message (std::shared_ptr const& m) } if ((pLDo && (pLSeq != 0)) && - m_journal.active(beast::Journal::Severity::kDebug)) - m_journal.debug << "Received partial fetch pack for " << pLSeq; + journal_.active(beast::Journal::Severity::kDebug)) + journal_.debug << "Received partial fetch pack for " << pLSeq; if (packet.type () == protocol::TMGetObjectByHash::otFETCH_PACK) getApp().getOPs ().gotFetchPack (progress, pLSeq); @@ -1393,51 +1750,6 @@ PeerImp::on_message (std::shared_ptr const& m) //------------------------------------------------------------------------------ -/** A peer has sent us transaction set data */ -// VFALCO TODO Make this non-static -static void peerTXData (Job&, - std::weak_ptr wPeer, - uint256 const& hash, - std::shared_ptr pPacket, - beast::Journal journal) -{ - std::shared_ptr peer = wPeer.lock (); - if (!peer) - return; - - protocol::TMLedgerData& packet = *pPacket; - - std::list nodeIDs; - std::list< Blob > nodeData; - for (int i = 0; i < packet.nodes ().size (); ++i) - { - const protocol::TMLedgerNode& node = packet.nodes (i); - - if (!node.has_nodeid () || !node.has_nodedata () || (node.nodeid ().size () != 33)) - { - journal.warning << "LedgerData request with invalid node ID"; - peer->charge (Resource::feeInvalidRequest); - return; - } - - nodeIDs.push_back (SHAMapNodeID {node.nodeid ().data (), - static_cast(node.nodeid ().size ())}); - nodeData.push_back (Blob (node.nodedata ().begin (), node.nodedata ().end ())); - } - - SHAMapAddNode san; - { - Application::ScopedLockType lock (getApp ().getMasterLock ()); - - san = getApp().getOPs().gotTXData (peer, hash, nodeIDs, nodeData); - } - - if (san.isInvalid ()) - { - peer->charge (Resource::feeUnwantedData); - } -} - // VFALCO NOTE This function is way too big and cumbersome. void PeerImp::getLedger (protocol::TMGetLedger& packet) @@ -1454,13 +1766,13 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (packet.itype () == protocol::liTS_CANDIDATE) { // Request is for a transaction candidate set - m_journal.trace << "Received request for TX candidate set data " + journal_.trace << "Received request for TX candidate set data " << to_string (this); if ((!packet.has_ledgerhash () || packet.ledgerhash ().size () != 32)) { charge (Resource::feeInvalidRequest); - m_journal.warning << "invalid request for TX candidate set data"; + journal_.warning << "invalid request for TX candidate set data"; return; } @@ -1476,7 +1788,7 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) { if (packet.has_querytype () && !packet.has_requestcookie ()) { - m_journal.debug << "Trying to route TX set request"; + journal_.debug << "Trying to route TX set request"; struct get_usable_peers { @@ -1502,23 +1814,24 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) } }; - Overlay::PeerSequence usablePeers (m_overlay.foreach ( + Overlay::PeerSequence usablePeers (overlay_.foreach ( get_usable_peers (txHash, this))); if (usablePeers.empty ()) { - m_journal.info << "Unable to route TX set request"; + journal_.info << "Unable to route TX set request"; return; } - Peer::ptr const& selectedPeer = usablePeers[rand () % usablePeers.size ()]; + Peer::ptr const& selectedPeer = usablePeers [ + rand () % usablePeers.size ()]; packet.set_requestcookie (getShortId ()); - selectedPeer->send ( - std::make_shared (packet, protocol::mtGET_LEDGER)); + selectedPeer->send (std::make_shared ( + packet, protocol::mtGET_LEDGER)); return; } - m_journal.error << "We do not have the map our peer wants " + journal_.error << "We do not have the map our peer wants " << to_string (this); charge (Resource::feeInvalidRequest); @@ -1533,14 +1846,14 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) } else { - if (getApp().getFeeTrack().isLoadedLocal() && !m_clusterNode) + if (getApp().getFeeTrack().isLoadedLocal() && !clusterNode_) { - m_journal.debug << "Too busy to fetch ledger data"; + journal_.debug << "Too busy to fetch ledger data"; return; } // Figure out what ledger they want - m_journal.trace << "Received request for ledger data " + journal_.trace << "Received request for ledger data " << to_string (this); Ledger::pointer ledger; @@ -1551,7 +1864,7 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (packet.ledgerhash ().size () != 32) { charge (Resource::feeInvalidRequest); - m_journal.warning << "Invalid request"; + journal_.warning << "Invalid request"; return; } @@ -1560,19 +1873,20 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) logMe += to_string (ledgerhash); ledger = getApp().getLedgerMaster ().getLedgerByHash (ledgerhash); - if (!ledger && m_journal.trace) - m_journal.trace << "Don't have ledger " << ledgerhash; + if (!ledger && journal_.trace) + journal_.trace << "Don't have ledger " << ledgerhash; - if (!ledger && (packet.has_querytype () && !packet.has_requestcookie ())) + if (!ledger && (packet.has_querytype () && + !packet.has_requestcookie ())) { std::uint32_t seq = 0; if (packet.has_ledgerseq ()) seq = packet.ledgerseq (); - Overlay::PeerSequence peerList = m_overlay.getActivePeers (); + Overlay::PeerSequence peerList = overlay_.getActivePeers (); Overlay::PeerSequence usablePeers; - BOOST_FOREACH (Peer::ptr const& peer, peerList) + for (auto const& peer : peerList) { if (peer->hasLedger (ledgerhash, seq) && (peer.get () != this)) usablePeers.push_back (peer); @@ -1580,28 +1894,31 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (usablePeers.empty ()) { - m_journal.trace << "Unable to route ledger request"; + journal_.trace << "Unable to route ledger request"; return; } - Peer::ptr const& selectedPeer = usablePeers[rand () % usablePeers.size ()]; + Peer::ptr const& selectedPeer = usablePeers [ + rand () % usablePeers.size ()]; packet.set_requestcookie (getShortId ()); selectedPeer->send ( std::make_shared (packet, protocol::mtGET_LEDGER)); - m_journal.debug << "Ledger request routed"; + journal_.debug << "Ledger request routed"; return; } } else if (packet.has_ledgerseq ()) { - if (packet.ledgerseq() < getApp().getLedgerMaster().getEarliestFetch()) + if (packet.ledgerseq() < + getApp().getLedgerMaster().getEarliestFetch()) { - m_journal.debug << "Peer requests early ledger"; + journal_.debug << "Peer requests early ledger"; return; } - ledger = getApp().getLedgerMaster ().getLedgerBySeq (packet.ledgerseq ()); - if (!ledger && m_journal.debug) - m_journal.debug << "Don't have ledger " << packet.ledgerseq (); + ledger = getApp().getLedgerMaster ().getLedgerBySeq ( + packet.ledgerseq ()); + if (!ledger && journal_.debug) + journal_.debug << "Don't have ledger " << packet.ledgerseq (); } else if (packet.has_ltype () && (packet.ltype () == protocol::ltCURRENT)) { @@ -1612,28 +1929,31 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) ledger = getApp().getLedgerMaster ().getClosedLedger (); if (ledger && !ledger->isClosed ()) - ledger = getApp().getLedgerMaster ().getLedgerBySeq (ledger->getLedgerSeq () - 1); + ledger = getApp().getLedgerMaster ().getLedgerBySeq ( + ledger->getLedgerSeq () - 1); } else { charge (Resource::feeInvalidRequest); - m_journal.warning << "Can't figure out what ledger they want"; + journal_.warning << "Can't figure out what ledger they want"; return; } - if ((!ledger) || (packet.has_ledgerseq () && (packet.ledgerseq () != ledger->getLedgerSeq ()))) + if ((!ledger) || (packet.has_ledgerseq () && ( + packet.ledgerseq () != ledger->getLedgerSeq ()))) { charge (Resource::feeInvalidRequest); - if (m_journal.warning && ledger) - m_journal.warning << "Ledger has wrong sequence"; + if (journal_.warning && ledger) + journal_.warning << "Ledger has wrong sequence"; return; } - if (!packet.has_ledgerseq() && (ledger->getLedgerSeq() < getApp().getLedgerMaster().getEarliestFetch())) + if (!packet.has_ledgerseq() && (ledger->getLedgerSeq() < + getApp().getLedgerMaster().getEarliestFetch())) { - m_journal.debug << "Peer requests early ledger"; + journal_.debug << "Peer requests early ledger"; return; } @@ -1646,10 +1966,11 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (packet.itype () == protocol::liBASE) { // they want the ledger base data - m_journal.trace << "They want ledger base data"; + journal_.trace << "They want ledger base data"; Serializer nData (128); ledger->addRaw (nData); - reply.add_nodes ()->set_nodedata (nData.getDataPtr (), nData.getLength ()); + reply.add_nodes ()->set_nodedata ( + nData.getDataPtr (), nData.getLength ()); SHAMap::pointer map = ledger->peekAccountStateMap (); @@ -1660,7 +1981,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (map->getRootNode (rootNode, snfWIRE)) { - reply.add_nodes ()->set_nodedata (rootNode.getDataPtr (), rootNode.getLength ()); + reply.add_nodes ()->set_nodedata ( + rootNode.getDataPtr (), rootNode.getLength ()); if (ledger->getTransHash ().isNonZero ()) { @@ -1671,13 +1993,16 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) rootNode.erase (); if (map->getRootNode (rootNode, snfWIRE)) - reply.add_nodes ()->set_nodedata (rootNode.getDataPtr (), rootNode.getLength ()); + reply.add_nodes ()->set_nodedata ( + rootNode.getDataPtr (), + rootNode.getLength ()); } } } } - Message::pointer oPacket = std::make_shared (reply, protocol::mtLEDGER_DATA); + Message::pointer oPacket = std::make_shared ( + reply, protocol::mtLEDGER_DATA); send (oPacket); return; } @@ -1698,12 +2023,12 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (!map || (packet.nodeids_size () == 0)) { - m_journal.warning << "Can't find map or empty request"; + journal_.warning << "Can't find map or empty request"; charge (Resource::feeInvalidRequest); return; } - m_journal.trace << "Request: " << logMe; + journal_.trace << "Request: " << logMe; for (int i = 0; i < packet.nodeids ().size (); ++i) { @@ -1711,7 +2036,7 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (!mn.isValid ()) { - m_journal.warning << "Request for invalid node: " << logMe; + journal_.warning << "Request for invalid node: " << logMe; charge (Resource::feeInvalidRequest); return; } @@ -1724,22 +2049,26 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (map->getNodeFat (mn, nodeIDs, rawNodes, fatRoot, fatLeaves)) { assert (nodeIDs.size () == rawNodes.size ()); - m_journal.trace << "getNodeFat got " << rawNodes.size () << " nodes"; + journal_.trace << + "getNodeFat got " << rawNodes.size () << " nodes"; std::vector::iterator nodeIDIterator; std::list< Blob >::iterator rawNodeIterator; - for (nodeIDIterator = nodeIDs.begin (), rawNodeIterator = rawNodes.begin (); - nodeIDIterator != nodeIDs.end (); ++nodeIDIterator, ++rawNodeIterator) + for (nodeIDIterator = nodeIDs.begin (), + rawNodeIterator = rawNodes.begin (); + nodeIDIterator != nodeIDs.end (); + ++nodeIDIterator, ++rawNodeIterator) { Serializer nID (33); nodeIDIterator->addIDRaw (nID); protocol::TMLedgerNode* node = reply.add_nodes (); node->set_nodeid (nID.getDataPtr (), nID.getLength ()); - node->set_nodedata (&rawNodeIterator->front (), rawNodeIterator->size ()); + node->set_nodedata (&rawNodeIterator->front (), + rawNodeIterator->size ()); } } else - m_journal.warning << "getNodeFat returns false"; + journal_.warning << "getNodeFat returns false"; } catch (std::exception&) { @@ -1757,18 +2086,493 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (!packet.has_ledgerhash ()) info += ", no hash specified"; - m_journal.warning << "getNodeFat( " << mn << ") throws exception: " << info; + journal_.warning << + "getNodeFat( " << mn << ") throws exception: " << info; } } - Message::pointer oPacket = std::make_shared (reply, protocol::mtLEDGER_DATA); + Message::pointer oPacket = std::make_shared ( + reply, protocol::mtLEDGER_DATA); send (oPacket); } -// This is dispatched by the job queue -static +//------------------------------------------------------------------------------ + void -sGetLedger (std::weak_ptr wPeer, +PeerImp::detach (const char* rsn, bool graceful) +{ + if (! strand_.running_in_this_thread ()) + { + strand_.post (std::bind (&PeerImp::detach, + shared_from_this (), rsn, graceful)); + return; + } + + if (!detaching_) + { + // NIKB TODO No - a race is NOT ok. This needs to be fixed + // to have PeerFinder work reliably. + detaching_ = true; // Race is ok. + + if (was_canceled_) + peerFinder_.on_cancel (slot_); + else + peerFinder_.on_closed (slot_); + + if (state_ == stateActive) + overlay_.onPeerDisconnect (shared_from_this ()); + + state_ = stateGracefulClose; + + if (clusterNode_ && journal_.active(beast::Journal::Severity::kWarning)) + journal_.warning << "Cluster peer " << name_ << + " detached: " << rsn; + + send_queue_.clear (); + + (void) timer_.cancel (); + + if (graceful) + { + socket_->async_shutdown ( + strand_.wrap ( std::bind( + &PeerImp::handleShutdown, + std::static_pointer_cast (shared_from_this ()), + beast::asio::placeholders::error))); + } + else + { + socket_->cancel (); + } + + // VFALCO TODO Stop doing this. + if (publicKey_.isValid ()) + publicKey_.clear (); // Be idempotent. + } +} + +//-------------------------------------------------------------------------- + +void +PeerImp::sendGetPeers () +{ + // Ask peer for known other peers. + protocol::TMGetPeers msg; + + msg.set_doweneedthis (1); + + Message::pointer packet = std::make_shared ( + msg, protocol::mtGET_PEERS); + + send (packet); +} + +void +PeerImp::charge (std::weak_ptr & peer, Resource::Charge const& fee) +{ + Peer::ptr p (peer.lock()); + + if (p != nullptr) + p->charge (fee); +} + +void +PeerImp::sendForce (const Message::pointer& packet) +{ + // must be on IO strand + if (!detaching_) + { + send_packet_ = packet; + + boost::asio::async_write (*socket_, + boost::asio::buffer (packet->getBuffer ()), + strand_.wrap (std::bind ( + &PeerImp::handleWrite, + std::static_pointer_cast (shared_from_this ()), + beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); + } +} + +bool +PeerImp::hashLatestFinishedMessage (const SSL *sslSession, unsigned char *hash, + size_t (*getFinishedMessage)(const SSL *, void *buf, size_t)) +{ + unsigned char buf[1024]; + + // Get our finished message and hash it. + std::memset(hash, 0, 64); + + size_t len = getFinishedMessage (sslSession, buf, sizeof (buf)); + + if(len < sslMinimumFinishedLength) + return false; + + SHA512 (buf, len, hash); + + return true; +} + +bool +PeerImp::calculateSessionCookie () +{ + SSL* ssl = socket_->ssl_handle (); + + if (!ssl) + { + journal_.error << "Cookie generation: No underlying connection"; + return false; + } + + unsigned char sha1[64]; + unsigned char sha2[64]; + + if (!hashLatestFinishedMessage(ssl, sha1, SSL_get_finished)) + { + journal_.error << "Cookie generation: local setup not complete"; + return false; + } + + if (!hashLatestFinishedMessage(ssl, sha2, SSL_get_peer_finished)) + { + journal_.error << "Cookie generation: peer setup not complete"; + return false; + } + + // If both messages hash to the same value (i.e. match) something is + // wrong. This would cause the resulting cookie to be 0. + if (memcmp (sha1, sha2, sizeof (sha1)) == 0) + { + journal_.error << "Cookie generation: identical finished messages"; + return false; + } + + for (size_t i = 0; i < sizeof (sha1); ++i) + sha1[i] ^= sha2[i]; + + // Finally, derive the actual cookie for the values that we have + // calculated. + secureCookie_ = Serializer::getSHA512Half (sha1, sizeof(sha1)); + + return true; +} + +bool +PeerImp::sendHello () +{ + if (!calculateSessionCookie()) + return false; + + Blob vchSig; + getApp().getLocalCredentials ().getNodePrivate ().signNodePrivate ( + secureCookie_, vchSig); + + protocol::TMHello h; + + h.set_protoversion (to_packed (BuildInfo::getCurrentProtocol())); + h.set_protoversionmin (to_packed (BuildInfo::getMinimumProtocol())); + h.set_fullversion (BuildInfo::getFullVersionString ()); + h.set_nettime (getApp().getOPs ().getNetworkTimeNC ()); + h.set_nodepublic (getApp().getLocalCredentials ().getNodePublic ( + ).humanNodePublic ()); + h.set_nodeproof (&vchSig[0], vchSig.size ()); + h.set_ipv4port (getConfig ().peerListeningPort); + h.set_testnet (false); + + // We always advertise ourselves as private in the HELLO message. This + // suppresses the old peer advertising code and allows PeerFinder to + // take over the functionality. + h.set_nodeprivate (true); + + Ledger::pointer closedLedger = getApp().getLedgerMaster ().getClosedLedger (); + + if (closedLedger && closedLedger->isClosed ()) + { + uint256 hash = closedLedger->getHash (); + h.set_ledgerclosed (hash.begin (), hash.size ()); + hash = closedLedger->getParentHash (); + h.set_ledgerprevious (hash.begin (), hash.size ()); + } + + Message::pointer packet = std::make_shared ( + h, protocol::mtHELLO); + send (packet); + + return true; +} + +void +PeerImp::addLedger (uint256 const& hash) +{ + std::lock_guard sl(recentLock_); + + if (std::find (recentLedgers_.begin(), + recentLedgers_.end(), hash) != recentLedgers_.end()) + return; + + // VFALCO TODO See if a sorted vector would be better. + + if (recentLedgers_.size () == 128) + recentLedgers_.pop_front (); + + recentLedgers_.push_back (hash); +} + +void +PeerImp::addTxSet (uint256 const& hash) +{ + std::lock_guard sl(recentLock_); + + if (std::find (recentTxSets_.begin (), + recentTxSets_.end (), hash) != recentTxSets_.end ()) + return; + + if (recentTxSets_.size () == 128) + recentTxSets_.pop_front (); + + recentTxSets_.push_back (hash); +} + +void +PeerImp::doFetchPack (const std::shared_ptr& packet) +{ + // VFALCO TODO Invert this dependency using an observer and shared state object. + // Don't queue fetch pack jobs if we're under load or we already have + // some queued. + if (getApp().getFeeTrack ().isLoadedLocal () || + (getApp().getLedgerMaster().getValidatedLedgerAge() > 40) || + (getApp().getJobQueue().getJobCount(jtPACK) > 10)) + { + journal_.info << "Too busy to make fetch pack"; + return; + } + + if (packet->ledgerhash ().size () != 32) + { + journal_.warning << "FetchPack hash size malformed"; + charge (Resource::feeInvalidRequest); + return; + } + + uint256 hash; + memcpy (hash.begin (), packet->ledgerhash ().data (), 32); + + getApp().getJobQueue ().addJob (jtPACK, "MakeFetchPack", + std::bind (&NetworkOPs::makeFetchPack, &getApp().getOPs (), + std::placeholders::_1, std::weak_ptr (shared_from_this ()), + packet, hash, UptimeTimer::getInstance ().getElapsedSeconds ())); +} + +void +PeerImp::doProofOfWork (Job&, std::weak_ptr peer, + ProofOfWork::pointer pow) +{ + if (peer.expired ()) + return; + + uint256 solution = pow->solve (); + + if (solution.isZero ()) + { + journal_.warning << "Failed to solve proof of work"; + } + else + { + Peer::ptr pptr (peer.lock ()); + + if (pptr) + { + protocol::TMProofWork reply; + reply.set_token (pow->getToken ()); + reply.set_response (solution.begin (), solution.size ()); + pptr->send (std::make_shared ( + reply, protocol::mtPROOFOFWORK)); + } + else + { + // WRITEME: Save solved proof of work for new connection + } + } +} + +void +PeerImp::checkTransaction (Job&, int flags, + SerializedTransaction::pointer stx, std::weak_ptr peer) +{ + // VFALCO TODO Rewrite to not use exceptions + try + { + // Expired? + if (stx->isFieldPresent(sfLastLedgerSequence) && + (stx->getFieldU32 (sfLastLedgerSequence) < + getApp().getLedgerMaster().getValidLedgerIndex())) + { + getApp().getHashRouter().setFlag(stx->getTransactionID(), SF_BAD); + charge (peer, Resource::feeUnwantedData); + return; + } + + auto validate = (flags & SF_SIGGOOD) ? Validate::NO : Validate::YES; + auto tx = std::make_shared (stx, validate); + + if (tx->getStatus () == INVALID) + { + getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD); + charge (peer, Resource::feeInvalidSignature); + return; + } + else + getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_SIGGOOD); + + bool const trusted (flags & SF_TRUSTED); + getApp().getOPs ().processTransaction (tx, trusted, false, false); + } + catch (...) + { + getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD); + charge (peer, Resource::feeInvalidRequest); + } +} + +// Called from our JobQueue +void +PeerImp::checkPropose (Job& job, Overlay* pPeers, + std::shared_ptr packet, + LedgerProposal::pointer proposal, uint256 consensusLCL, + RippleAddress nodePublic, std::weak_ptr peer, + bool fromCluster, beast::Journal journal) +{ + bool sigGood = false; + bool isTrusted = (job.getType () == jtPROPOSAL_t); + + journal.trace << + "Checking " << (isTrusted ? "trusted" : "UNTRUSTED") << " proposal"; + + assert (packet); + protocol::TMProposeSet& set = *packet; + + uint256 prevLedger; + + if (set.has_previousledger ()) + { + // proposal includes a previous ledger + journal.trace << + "proposal with previous ledger"; + memcpy (prevLedger.begin (), set.previousledger ().data (), 256 / 8); + + if (!fromCluster && !proposal->checkSign (set.signature ())) + { + Peer::ptr p = peer.lock (); + journal.warning << + "proposal with previous ledger fails sig check: " << *p; + charge (peer, Resource::feeInvalidSignature); + return; + } + else + sigGood = true; + } + else + { + if (consensusLCL.isNonZero () && proposal->checkSign (set.signature ())) + { + prevLedger = consensusLCL; + sigGood = true; + } + else + { + // Could be mismatched prev ledger + journal.warning << + "Ledger proposal fails signature check"; + proposal->setSignature (set.signature ()); + } + } + + if (isTrusted) + { + getApp().getOPs ().processTrustedProposal ( + proposal, packet, nodePublic, prevLedger, sigGood); + } + else if (sigGood && (prevLedger == consensusLCL)) + { + // relay untrusted proposal + journal.trace << + "relaying UNTRUSTED proposal"; + std::set peers; + + if (getApp().getHashRouter ().swapSet ( + proposal->getSuppressionID (), peers, SF_RELAYED)) + { + pPeers->foreach (send_if_not ( + std::make_shared (set, protocol::mtPROPOSE_LEDGER), + peer_in_set(peers))); + } + } + else + { + journal.debug << + "Not relaying UNTRUSTED proposal"; + } +} + +void +PeerImp::checkValidation (Job&, Overlay* pPeers, + SerializedValidation::pointer val, bool isTrusted, bool isCluster, + std::shared_ptr packet, + std::weak_ptr peer, beast::Journal journal) +{ + try + { + uint256 signingHash = val->getSigningHash(); + if (!isCluster && !val->isValid (signingHash)) + { + journal.warning << + "Validation is invalid"; + charge (peer, Resource::feeInvalidRequest); + return; + } + + std::string source; + Peer::ptr lp = peer.lock (); + + if (lp) + source = to_string(*lp); + else + source = "unknown"; + + std::set peers; + + //---------------------------------------------------------------------- + // + { + SerializedValidation const& sv (*val); + Validators::ReceivedValidation rv; + rv.ledgerHash = sv.getLedgerHash (); + rv.publicKey = sv.getSignerPublic(); + getApp ().getValidators ().on_receive_validation (rv); + } + // + //---------------------------------------------------------------------- + + if (getApp().getOPs ().recvValidation (val, source) && + getApp().getHashRouter ().swapSet ( + signingHash, peers, SF_RELAYED)) + { + pPeers->foreach (send_if_not ( + std::make_shared (*packet, protocol::mtVALIDATION), + peer_in_set(peers))); + } + } + catch (...) + { + journal.trace << + "Exception processing validation"; + charge (peer, Resource::feeInvalidRequest); + } +} + +// This is dispatched by the job queue +void +PeerImp::sGetLedger (std::weak_ptr wPeer, std::shared_ptr packet) { std::shared_ptr peer = wPeer.lock (); @@ -1777,4 +2581,48 @@ sGetLedger (std::weak_ptr wPeer, peer->getLedger (*packet); } +// VFALCO TODO Make this non-static +void +PeerImp::peerTXData (Job&, std::weak_ptr wPeer, uint256 const& hash, + std::shared_ptr pPacket, beast::Journal journal) +{ + std::shared_ptr peer = wPeer.lock (); + if (!peer) + return; + + protocol::TMLedgerData& packet = *pPacket; + + std::list nodeIDs; + std::list< Blob > nodeData; + for (int i = 0; i < packet.nodes ().size (); ++i) + { + const protocol::TMLedgerNode& node = packet.nodes (i); + + if (!node.has_nodeid () || !node.has_nodedata () || ( + node.nodeid ().size () != 33)) + { + journal.warning << "LedgerData request with invalid node ID"; + peer->charge (Resource::feeInvalidRequest); + return; + } + + nodeIDs.push_back (SHAMapNodeID {node.nodeid ().data (), + static_cast(node.nodeid ().size ())}); + nodeData.push_back (Blob (node.nodedata ().begin (), + node.nodedata ().end ())); + } + + SHAMapAddNode san; + { + Application::ScopedLockType lock (getApp ().getMasterLock ()); + + san = getApp().getOPs().gotTXData (peer, hash, nodeIDs, nodeData); + } + + if (san.isInvalid ()) + { + peer->charge (Resource::feeUnwantedData); + } +} + } // ripple diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 5e7abcf1bc..5883d74425 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -73,21 +73,11 @@ class PeerImp , private beast::LeakChecked , private abstract_protocol_handler { -private: - /** Time alloted for a peer to send a HELLO message (DEPRECATED) */ - static const boost::posix_time::seconds nodeVerifySeconds; - - /** The clock drift we allow a remote peer to have */ - static const std::uint32_t clockToleranceDeltaSeconds = 20; - - /** The length of the smallest valid finished message */ - static const size_t sslMinimumFinishedLength = 12; - public: /** Current state */ enum State { - /** An connection is being established (outbound) */ + /** A connection is being established (outbound) */ stateConnecting /** Connection has been successfully established */ @@ -105,70 +95,82 @@ public: typedef std::shared_ptr ptr; +private: + // Time alloted for a peer to send a HELLO message (DEPRECATED) + static const boost::posix_time::seconds nodeVerifySeconds; + + // The clock drift we allow a remote peer to have + static const std::uint32_t clockToleranceDeltaSeconds = 20; + + // The length of the smallest valid finished message + static const size_t sslMinimumFinishedLength = 12; + NativeSocketType m_owned_socket; - beast::Journal m_journal; + beast::Journal journal_; // A unique identifier (up to a restart of rippled) for this particular // peer instance. A peer that disconnects will, upon reconnection, get a // new ID. - ShortId m_shortId; + ShortId shortId_ = 0; // Updated at each stage of the connection process to reflect // the current conditions as closely as possible. This includes // the case where we learn the true IP via a PROXY handshake. - beast::IP::Endpoint m_remoteAddress; + beast::IP::Endpoint remote_address_; // These is up here to prevent warnings about order of initializations // - Resource::Manager& m_resourceManager; - PeerFinder::Manager& m_peerFinder; - OverlayImpl& m_overlay; + Resource::Manager& resourceManager_; + PeerFinder::Manager& peerFinder_; + OverlayImpl& overlay_; bool m_inbound; - std::unique_ptr m_socket; - boost::asio::io_service::strand m_strand; + std::unique_ptr socket_; + boost::asio::io_service::strand strand_; - State m_state; // Current state - bool m_detaching; // True if detaching. - bool m_clusterNode; // True if peer is a node in our cluster - RippleAddress m_nodePublicKey; // Node public key of peer. - std::string m_nodeName; + State state_; // Current state + bool detaching_ = false; + + // True if peer is a node in our cluster + bool clusterNode_ = false; + + // Node public key of peer. + RippleAddress publicKey_; + + std::string name_; // Both sides of the peer calculate this value and verify that it matches // to detect/prevent man-in-the-middle attacks. // - uint256 m_secureCookie; + uint256 secureCookie_; // The indices of the smallest and largest ledgers this peer has available // - LedgerIndex m_minLedger; - LedgerIndex m_maxLedger; + LedgerIndex minLedger_; + LedgerIndex maxLedger_; - uint256 m_closedLedgerHash; - uint256 m_previousLedgerHash; + uint256 closedLedgerHash_; + uint256 previousLedgerHash_; - std::list m_recentLedgers; - std::list m_recentTxSets; - mutable std::mutex m_recentLock; + std::list recentLedgers_; + std::list recentTxSets_; + mutable std::mutex recentLock_; - boost::asio::deadline_timer timer_; + boost::asio::deadline_timer timer_; - std::vector m_readBuffer; - std::list mSendQ; - Message::pointer mSendingPacket; - protocol::TMStatusChange mLastStatus; - protocol::TMHello mHello; + std::list send_queue_; + Message::pointer send_packet_; + protocol::TMStatusChange last_status_; + protocol::TMHello hello_; - Resource::Consumer m_usage; + Resource::Consumer usage_; // The slot assigned to us by PeerFinder - PeerFinder::Slot::ptr m_slot; + PeerFinder::Slot::ptr slot_; // True if close was called - bool m_was_canceled; - - + bool was_canceled_ = false; boost::asio::streambuf read_buffer_; boost::optional http_message_; @@ -176,102 +178,109 @@ public: message_stream message_stream_; boost::asio::streambuf write_buffer_; - bool write_pending_; - + std::unique_ptr load_event_; //-------------------------------------------------------------------------- - /** New incoming peer from the specified socket */ - PeerImp ( - NativeSocketType&& socket, - beast::IP::Endpoint remoteAddress, - OverlayImpl& overlay, - Resource::Manager& resourceManager, - PeerFinder::Manager& peerFinder, - PeerFinder::Slot::ptr const& slot, - boost::asio::ssl::context& ssl_context, - MultiSocket::Flag flags) - : m_owned_socket (std::move (socket)) - , m_journal (deprecatedLogs().journal("Peer")) - , m_shortId (0) - , m_remoteAddress (remoteAddress) - , m_resourceManager (resourceManager) - , m_peerFinder (peerFinder) - , m_overlay (overlay) - , m_inbound (true) - , m_socket (MultiSocket::New ( - m_owned_socket, ssl_context, flags.asBits ())) - , m_strand (m_owned_socket.get_io_service()) - , m_state (stateConnected) - , m_detaching (false) - , m_clusterNode (false) - , m_minLedger (0) - , m_maxLedger (0) - , timer_ (m_owned_socket.get_io_service()) - , m_slot (slot) - , m_was_canceled (false) - , message_stream_(*this) - , write_pending_ (false) - { - } +public: + /** Create an incoming peer from the specified socket */ + PeerImp (NativeSocketType&& socket, beast::IP::Endpoint remoteAddress, + OverlayImpl& overlay, Resource::Manager& resourceManager, + PeerFinder::Manager& peerFinder, PeerFinder::Slot::ptr const& slot, + boost::asio::ssl::context& ssl_context, MultiSocket::Flag flags); - /** New outgoing peer + /** Create an outgoing peer @note Construction of outbound peers is a two step process: a second call is needed (to connect or accept) but we cannot make it from inside the constructor because you cannot call shared_from_this from inside constructors. */ - PeerImp ( - beast::IP::Endpoint remoteAddress, - boost::asio::io_service& io_service, - OverlayImpl& overlay, - Resource::Manager& resourceManager, - PeerFinder::Manager& peerFinder, - PeerFinder::Slot::ptr const& slot, - boost::asio::ssl::context& ssl_context, - MultiSocket::Flag flags) - : m_owned_socket (io_service) - , m_journal (deprecatedLogs().journal("Peer")) - , m_shortId (0) - , m_remoteAddress (remoteAddress) - , m_resourceManager (resourceManager) - , m_peerFinder (peerFinder) - , m_overlay (overlay) - , m_inbound (false) - , m_socket (MultiSocket::New ( - io_service, ssl_context, flags.asBits ())) - , m_strand (io_service) - , m_state (stateConnecting) - , m_detaching (false) - , m_clusterNode (false) - , m_minLedger (0) - , m_maxLedger (0) - , timer_ (io_service) - , m_slot (slot) - , m_was_canceled (false) - , message_stream_(*this) - , write_pending_ (false) - { - } + PeerImp (beast::IP::Endpoint remoteAddress, boost::asio::io_service& io_service, + OverlayImpl& overlay, Resource::Manager& resourceManager, + PeerFinder::Manager& peerFinder, PeerFinder::Slot::ptr const& slot, + boost::asio::ssl::context& ssl_context, MultiSocket::Flag flags); virtual - ~PeerImp () - { - m_overlay.remove (m_slot); - } + ~PeerImp (); PeerImp (PeerImp const&) = delete; PeerImp& operator= (PeerImp const&) = delete; - MultiSocket& getStream () - { - return *m_socket; - } + // Begin asynchronous initiation function calls + void + start (); - static char const* getCountedObjectName () { return "Peer"; } + /** Indicates that the peer must be activated. + A peer is activated after the handshake is completed and if it is not + a second connection from a peer that we already have. Once activated + the peer transitions to `stateActive` and begins operating. + */ + void + activate (); - void getLedger (protocol::TMGetLedger& packet); + /** Close the connection. */ + void close (bool graceful); + + void + getLedger (protocol::TMGetLedger& packet); + + // + // Network + // + + void + send (Message::pointer const& m) override; + + beast::IP::Endpoint + getRemoteAddress() const override; + + void + charge (Resource::Charge const& fee) override; + + // + // Identity + // + + Peer::ShortId + getShortId () const override; + + RippleAddress const& + getNodePublic () const; + + Json::Value + json() override; + + bool + isInCluster () const override; + + std::string const& + getClusterNodeName() const override; + + // + // Ledger + // + + uint256 const& + getClosedLedgerHash () const override; + + bool + hasLedger (uint256 const& hash, std::uint32_t seq) const override; + + void + ledgerRange (std::uint32_t& minSeq, std::uint32_t& maxSeq) const override; + + bool + hasTxSet (uint256 const& hash) const override; + + void + cycleStatus () override; + + bool + supportsVersion (int version) override; + + bool + hasRange (std::uint32_t uMin, std::uint32_t uMax) override; private: // @@ -331,7 +340,14 @@ private: void on_write_protocol (error_code ec, std::size_t bytes_transferred); - //-------------------------------------------------------------------------- + void + handleShutdown (boost::system::error_code const& ec); + + void + handleWrite (boost::system::error_code const& ec, size_t bytes); + + void + handleVerifyTimer (boost::system::error_code const& ec); //-------------------------------------------------------------------------- // @@ -377,18 +393,19 @@ private: //-------------------------------------------------------------------------- -public: +private: State state() const { - return m_state; + return state_; } void state (State new_state) { - m_state = new_state; + state_ = new_state; } //-------------------------------------------------------------------------- + /** Disconnect a peer The peer transitions from its current state into `stateGracefulClose` @@ -397,381 +414,20 @@ public: @param onIOStrand true if called on an I/O strand. It if is not, then a callback will be queued up. */ - void detach (const char* rsn, bool graceful = true) - { - if (! m_strand.running_in_this_thread ()) - { - m_strand.post (std::bind (&PeerImp::detach, - shared_from_this (), rsn, graceful)); - return; - } - - if (!m_detaching) - { - // NIKB TODO No - a race is NOT ok. This needs to be fixed - // to have PeerFinder work reliably. - m_detaching = true; // Race is ok. - - if (m_was_canceled) - m_peerFinder.on_cancel (m_slot); - else - m_peerFinder.on_closed (m_slot); - - if (m_state == stateActive) - m_overlay.onPeerDisconnect (shared_from_this ()); - - m_state = stateGracefulClose; - - if (m_clusterNode && m_journal.active(beast::Journal::Severity::kWarning)) - m_journal.warning << "Cluster peer " << m_nodeName << - " detached: " << rsn; - - mSendQ.clear (); - - (void) timer_.cancel (); - - if (graceful) - { - m_socket->async_shutdown ( - m_strand.wrap ( std::bind( - &PeerImp::handleShutdown, - std::static_pointer_cast (shared_from_this ()), - beast::asio::placeholders::error))); - } - else - { - m_socket->cancel (); - } - - // VFALCO TODO Stop doing this. - if (m_nodePublicKey.isValid ()) - m_nodePublicKey.clear (); // Be idempotent. - } - } - - /** Close the connection. */ - void close (bool graceful) - { - m_was_canceled = true; - detach ("stop", graceful); - } - - /** Indicates that the peer must be activated. - A peer is activated after the handshake is completed and if it is not - a second connection from a peer that we already have. Once activated - the peer transitions to `stateActive` and begins operating. - */ - void activate () - { - bassert (m_state == stateHandshaked); - m_state = stateActive; - bassert(m_shortId == 0); - m_shortId = m_overlay.next_id(); - m_overlay.onPeerActivated(shared_from_this ()); - } - - void start () - { - if (m_inbound) - do_accept (); - else - do_connect (); - } - - //-------------------------------------------------------------------------- - std::string getClusterNodeName() const - { - return m_nodeName; - } - - //-------------------------------------------------------------------------- + void + detach (const char* rsn, bool graceful = true); void - send (Message::pointer const& m) override - { - if (m) - { - if (m_strand.running_in_this_thread()) - { - if (mSendingPacket) - mSendQ.push_back (m); - else - sendForce (m); - } - else - { - m_strand.post (std::bind (&PeerImp::send, shared_from_this(), m)); - } + sendGetPeers (); - } - } + static + void + charge (std::weak_ptr & peer, Resource::Charge const& fee); - void sendGetPeers () - { - // Ask peer for known other peers. - protocol::TMGetPeers msg; - - msg.set_doweneedthis (1); - - Message::pointer packet = std::make_shared ( - msg, protocol::mtGET_PEERS); - - send (packet); - } - - void charge (Resource::Charge const& fee) - { - if ((m_usage.charge (fee) == Resource::drop) && m_usage.disconnect ()) - detach ("resource"); - } - - static void charge (std::weak_ptr & peer, Resource::Charge const& fee) - { - Peer::ptr p (peer.lock()); - - if (p != nullptr) - p->charge (fee); - } - - Json::Value json () - { - Json::Value ret (Json::objectValue); - - ret["public_key"] = m_nodePublicKey.ToString (); - ret["address"] = m_remoteAddress.to_string(); - - if (m_inbound) - ret["inbound"] = true; - - if (m_clusterNode) - { - ret["cluster"] = true; - - if (!m_nodeName.empty ()) - ret["name"] = m_nodeName; - } - - if (mHello.has_fullversion ()) - ret["version"] = mHello.fullversion (); - - if (mHello.has_protoversion ()) - { - auto protocol = BuildInfo::make_protocol (mHello.protoversion ()); - - if (protocol != BuildInfo::getCurrentProtocol()) - ret["protocol"] = to_string (protocol); - } - - std::uint32_t minSeq, maxSeq; - ledgerRange(minSeq, maxSeq); - - if ((minSeq != 0) || (maxSeq != 0)) - ret["complete_ledgers"] = boost::lexical_cast(minSeq) + " - " + - boost::lexical_cast(maxSeq); - - if (m_closedLedgerHash != zero) - ret["ledger"] = to_string (m_closedLedgerHash); - - if (mLastStatus.has_newstatus ()) - { - switch (mLastStatus.newstatus ()) - { - case protocol::nsCONNECTING: - ret["status"] = "connecting"; - break; - - case protocol::nsCONNECTED: - ret["status"] = "connected"; - break; - - case protocol::nsMONITORING: - ret["status"] = "monitoring"; - break; - - case protocol::nsVALIDATING: - ret["status"] = "validating"; - break; - - case protocol::nsSHUTTING: - ret["status"] = "shutting"; - break; - - default: - // FIXME: do we really want this? - m_journal.warning << "Unknown status: " << mLastStatus.newstatus (); - } - } - - return ret; - } - - bool isInCluster () const - { - return m_clusterNode; - } - - uint256 const& getClosedLedgerHash () const - { - return m_closedLedgerHash; - } - - bool hasLedger (uint256 const& hash, std::uint32_t seq) const - { - std::lock_guard sl(m_recentLock); - - if ((seq != 0) && (seq >= m_minLedger) && (seq <= m_maxLedger)) - return true; - - BOOST_FOREACH (uint256 const& ledger, m_recentLedgers) - { - if (ledger == hash) - return true; - } - - return false; - } - - void ledgerRange (std::uint32_t& minSeq, std::uint32_t& maxSeq) const - { - std::lock_guard sl(m_recentLock); - - minSeq = m_minLedger; - maxSeq = m_maxLedger; - } - - bool hasTxSet (uint256 const& hash) const - { - std::lock_guard sl(m_recentLock); - BOOST_FOREACH (uint256 const& set, m_recentTxSets) - - if (set == hash) - return true; - - return false; - } - - Peer::ShortId getShortId () const - { - return m_shortId; - } - - RippleAddress const& getNodePublic () const - { - return m_nodePublicKey; - } - - void cycleStatus () - { - m_previousLedgerHash = m_closedLedgerHash; - m_closedLedgerHash.zero (); - } - - bool supportsVersion (int version) - { - return mHello.has_protoversion () && (mHello.protoversion () >= version); - } - - bool hasRange (std::uint32_t uMin, std::uint32_t uMax) - { - return (uMin >= m_minLedger) && (uMax <= m_maxLedger); - } - - beast::IP::Endpoint getRemoteAddress() const - { - return m_remoteAddress; - } - -private: - void handleShutdown (boost::system::error_code const& ec) - { - if (m_detaching) - return; - - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec) - { - m_journal.info << "Shutdown: " << ec.message (); - detach ("hsd"); - return; - } - } - - void handleWrite (boost::system::error_code const& ec, size_t bytes) - { - if (m_detaching) - return; - - // Call on IO strand - - mSendingPacket.reset (); - - if (ec == boost::asio::error::operation_aborted) - return; - - if (m_detaching) - return; - - if (ec) - { - m_journal.info << "Write: " << ec.message (); - detach ("hw"); - return; - } - - if (!mSendQ.empty ()) - { - Message::pointer packet = mSendQ.front (); - - if (packet) - { - sendForce (packet); - mSendQ.pop_front (); - } - } - } - - void handleVerifyTimer (boost::system::error_code const& ec) - { - if (m_detaching) - return; - - if (ec == boost::asio::error::operation_aborted) - { - // Timer canceled because deadline no longer needed. - } - else if (ec) - { - m_journal.info << "Peer verify timer error"; - } - else - { - // m_journal.info << "Verify: Peer failed to verify in time."; - - detach ("hvt"); - } - } - - void sendForce (const Message::pointer& packet) - { - // must be on IO strand - if (!m_detaching) - { - mSendingPacket = packet; - - boost::asio::async_write (getStream (), - boost::asio::buffer (packet->getBuffer ()), - m_strand.wrap (std::bind ( - &PeerImp::handleWrite, - std::static_pointer_cast (shared_from_this ()), - beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); - } - } + void + sendForce (const Message::pointer& packet); /** Hashes the latest finished message from an SSL stream - @param sslSession the session to get the message from. @param hash the buffer into which the hash of the retrieved message will be saved. The buffer MUST be at least @@ -780,381 +436,78 @@ private: finished message. This be either: `SSL_get_finished` or `SSL_get_peer_finished`. - @return `true` if successful, `false` otherwise. - */ - bool hashLatestFinishedMessage (const SSL *sslSession, unsigned char *hash, - size_t (*getFinishedMessage)(const SSL *, void *buf, size_t)) - { - unsigned char buf[1024]; - - // Get our finished message and hash it. - std::memset(hash, 0, 64); - - size_t len = getFinishedMessage (sslSession, buf, sizeof (buf)); - - if(len < sslMinimumFinishedLength) - return false; - - SHA512 (buf, len, hash); - - return true; - } + bool + hashLatestFinishedMessage (const SSL *sslSession, unsigned char *hash, + size_t (*getFinishedMessage)(const SSL *, void *buf, size_t)); /** Generates a secure cookie to protect against man-in-the-middle attacks - This function should never fail under normal circumstances and regular server operation. - A failure prevents the cookie value from being calculated which is an important component of connection security. If this function fails, a secure connection cannot be established and the link MUST be dropped. - @return `true` if the cookie was generated, `false` otherwise. - @note failure is an exceptional situation - it should never happen and will almost always indicate an active man-in-the-middle attack is taking place. */ - bool calculateSessionCookie () - { - SSL* ssl = m_socket->ssl_handle (); - - if (!ssl) - { - m_journal.error << "Cookie generation: No underlying connection"; - return false; - } - - unsigned char sha1[64]; - unsigned char sha2[64]; - - if (!hashLatestFinishedMessage(ssl, sha1, SSL_get_finished)) - { - m_journal.error << "Cookie generation: local setup not complete"; - return false; - } - - if (!hashLatestFinishedMessage(ssl, sha2, SSL_get_peer_finished)) - { - m_journal.error << "Cookie generation: peer setup not complete"; - return false; - } - - // If both messages hash to the same value (i.e. match) something is - // wrong. This would cause the resulting cookie to be 0. - if (memcmp (sha1, sha2, sizeof (sha1)) == 0) - { - m_journal.error << "Cookie generation: identical finished messages"; - return false; - } - - for (size_t i = 0; i < sizeof (sha1); ++i) - sha1[i] ^= sha2[i]; - - // Finally, derive the actual cookie for the values that we have - // calculated. - m_secureCookie = Serializer::getSHA512Half (sha1, sizeof(sha1)); - - return true; - } + bool + calculateSessionCookie (); /** Perform a secure handshake with the peer at the other end. - If this function returns false then we cannot guarantee that there is no active man-in-the-middle attack taking place and the link MUST be disconnected. - @return true if successful, false otherwise. */ - bool sendHello () - { - if (!calculateSessionCookie()) - return false; + bool + sendHello(); - Blob vchSig; - getApp().getLocalCredentials ().getNodePrivate ().signNodePrivate (m_secureCookie, vchSig); + void + addLedger (uint256 const& hash); - protocol::TMHello h; + void + addTxSet (uint256 const& hash); - h.set_protoversion (to_packed (BuildInfo::getCurrentProtocol())); - h.set_protoversionmin (to_packed (BuildInfo::getMinimumProtocol())); - h.set_fullversion (BuildInfo::getFullVersionString ()); - h.set_nettime (getApp().getOPs ().getNetworkTimeNC ()); - h.set_nodepublic (getApp().getLocalCredentials ().getNodePublic ().humanNodePublic ()); - h.set_nodeproof (&vchSig[0], vchSig.size ()); - h.set_ipv4port (getConfig ().peerListeningPort); - h.set_testnet (false); + void + doFetchPack (const std::shared_ptr& packet); - // We always advertise ourselves as private in the HELLO message. This - // suppresses the old peer advertising code and allows PeerFinder to - // take over the functionality. - h.set_nodeprivate (true); + void + doProofOfWork (Job&, std::weak_ptr peer, ProofOfWork::pointer pow); - Ledger::pointer closedLedger = getApp().getLedgerMaster ().getClosedLedger (); - - if (closedLedger && closedLedger->isClosed ()) - { - uint256 hash = closedLedger->getHash (); - h.set_ledgerclosed (hash.begin (), hash.size ()); - hash = closedLedger->getParentHash (); - h.set_ledgerprevious (hash.begin (), hash.size ()); - } - - Message::pointer packet = std::make_shared ( - h, protocol::mtHELLO); - send (packet); - - return true; - } - - void addLedger (uint256 const& hash) - { - std::lock_guard sl(m_recentLock); - BOOST_FOREACH (uint256 const& ledger, m_recentLedgers) - - if (ledger == hash) - return; - - if (m_recentLedgers.size () == 128) - m_recentLedgers.pop_front (); - - m_recentLedgers.push_back (hash); - } - - void addTxSet (uint256 const& hash) - { - std::lock_guard sl(m_recentLock); - - if(std::find (m_recentTxSets.begin (), m_recentTxSets.end (), hash) != m_recentTxSets.end ()) - return; - - if (m_recentTxSets.size () == 128) - m_recentTxSets.pop_front (); - - m_recentTxSets.push_back (hash); - } - - void doFetchPack (const std::shared_ptr& packet) - { - // VFALCO TODO Invert this dependency using an observer and shared state object. - // Don't queue fetch pack jobs if we're under load or we already have - // some queued. - if (getApp().getFeeTrack ().isLoadedLocal () || - (getApp().getLedgerMaster().getValidatedLedgerAge() > 40) || - (getApp().getJobQueue().getJobCount(jtPACK) > 10)) - { - m_journal.info << "Too busy to make fetch pack"; - return; - } - - if (packet->ledgerhash ().size () != 32) - { - m_journal.warning << "FetchPack hash size malformed"; - charge (Resource::feeInvalidRequest); - return; - } - - uint256 hash; - memcpy (hash.begin (), packet->ledgerhash ().data (), 32); - - getApp().getJobQueue ().addJob (jtPACK, "MakeFetchPack", - std::bind (&NetworkOPs::makeFetchPack, &getApp().getOPs (), std::placeholders::_1, - std::weak_ptr (shared_from_this ()), packet, - hash, UptimeTimer::getInstance ().getElapsedSeconds ())); - } - - void doProofOfWork (Job&, std::weak_ptr peer, ProofOfWork::pointer pow) - { - if (peer.expired ()) - return; - - uint256 solution = pow->solve (); - - if (solution.isZero ()) - { - m_journal.warning << "Failed to solve proof of work"; - } - else - { - Peer::ptr pptr (peer.lock ()); - - if (pptr) - { - protocol::TMProofWork reply; - reply.set_token (pow->getToken ()); - reply.set_response (solution.begin (), solution.size ()); - pptr->send (std::make_shared (reply, protocol::mtPROOFOFWORK)); - } - else - { - // WRITEME: Save solved proof of work for new connection - } - } - } - - static void checkTransaction ( - Job&, - int flags, - SerializedTransaction::pointer stx, - std::weak_ptr peer) - { - try - { - if (stx->isFieldPresent(sfLastLedgerSequence) && - (stx->getFieldU32 (sfLastLedgerSequence) < - getApp().getLedgerMaster().getValidLedgerIndex())) - { // Transaction has expired - getApp().getHashRouter().setFlag(stx->getTransactionID(), SF_BAD); - charge (peer, Resource::feeUnwantedData); - return; - } - - auto validate = (flags & SF_SIGGOOD) ? Validate::NO : Validate::YES; - auto tx = std::make_shared (stx, validate); - - if (tx->getStatus () == INVALID) - { - getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD); - charge (peer, Resource::feeInvalidSignature); - return; - } - else - getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_SIGGOOD); - - bool const trusted (flags & SF_TRUSTED); - getApp().getOPs ().processTransaction (tx, trusted, false, false); - } - catch (...) - { - getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD); - charge (peer, Resource::feeInvalidRequest); - } - } + static + void checkTransaction (Job&, int flags, SerializedTransaction::pointer stx, + std::weak_ptr peer); // Called from our JobQueue - static void checkPropose (Job& job, Overlay* pPeers, std::shared_ptr packet, - LedgerProposal::pointer proposal, uint256 consensusLCL, RippleAddress nodePublic, - std::weak_ptr peer, bool fromCluster) - { - bool sigGood = false; - bool isTrusted = (job.getType () == jtPROPOSAL_t); + static + void + checkPropose (Job& job, Overlay* pPeers, + std::shared_ptr packet, + LedgerProposal::pointer proposal, uint256 consensusLCL, + RippleAddress nodePublic, std::weak_ptr peer, + bool fromCluster, beast::Journal journal); - WriteLog (lsTRACE, Peer) << "Checking " << - (isTrusted ? "trusted" : "UNTRUSTED") << - " proposal"; + static + void + checkValidation (Job&, Overlay* pPeers, SerializedValidation::pointer val, + bool isTrusted, bool isCluster, + std::shared_ptr packet, + std::weak_ptr peer, beast::Journal journal); - assert (packet); - protocol::TMProposeSet& set = *packet; + static + void + sGetLedger (std::weak_ptr wPeer, + std::shared_ptr packet); - uint256 prevLedger; - - if (set.has_previousledger ()) - { - // proposal includes a previous ledger - WriteLog(lsTRACE, Peer) << "proposal with previous ledger"; - memcpy (prevLedger.begin (), set.previousledger ().data (), 256 / 8); - - if (!fromCluster && !proposal->checkSign (set.signature ())) - { - Peer::ptr p = peer.lock (); - WriteLog(lsWARNING, Peer) << "proposal with previous ledger fails sig check: " << - *p; - charge (peer, Resource::feeInvalidSignature); - return; - } - else - sigGood = true; - } - else - { - if (consensusLCL.isNonZero () && proposal->checkSign (set.signature ())) - { - prevLedger = consensusLCL; - sigGood = true; - } - else - { - // Could be mismatched prev ledger - WriteLog(lsWARNING, Peer) << "Ledger proposal fails signature check"; - proposal->setSignature (set.signature ()); - } - } - - if (isTrusted) - { - getApp().getOPs ().processTrustedProposal (proposal, packet, nodePublic, prevLedger, sigGood); - } - else if (sigGood && (prevLedger == consensusLCL)) - { - // relay untrusted proposal - WriteLog(lsTRACE, Peer) << "relaying UNTRUSTED proposal"; - std::set peers; - - if (getApp().getHashRouter ().swapSet ( - proposal->getSuppressionID (), peers, SF_RELAYED)) - { - pPeers->foreach (send_if_not ( - std::make_shared (set, protocol::mtPROPOSE_LEDGER), - peer_in_set(peers))); - } - } - else - { - WriteLog(lsDEBUG, Peer) << "Not relaying UNTRUSTED proposal"; - } - } - - static void checkValidation (Job&, Overlay* pPeers, SerializedValidation::pointer val, bool isTrusted, bool isCluster, - std::shared_ptr packet, std::weak_ptr peer) - { - try - { - uint256 signingHash = val->getSigningHash(); - if (!isCluster && !val->isValid (signingHash)) - { - WriteLog(lsWARNING, Peer) << "Validation is invalid"; - charge (peer, Resource::feeInvalidRequest); - return; - } - - std::string source; - Peer::ptr lp = peer.lock (); - - if (lp) - source = to_string(*lp); - else - source = "unknown"; - - std::set peers; - - //---------------------------------------------------------------------- - // - { - SerializedValidation const& sv (*val); - Validators::ReceivedValidation rv; - rv.ledgerHash = sv.getLedgerHash (); - rv.publicKey = sv.getSignerPublic(); - getApp ().getValidators ().on_receive_validation (rv); - } - // - //---------------------------------------------------------------------- - - if (getApp().getOPs ().recvValidation (val, source) && - getApp().getHashRouter ().swapSet (signingHash, peers, SF_RELAYED)) - { - pPeers->foreach (send_if_not ( - std::make_shared (*packet, protocol::mtVALIDATION), - peer_in_set(peers))); - } - } - catch (...) - { - WriteLog(lsTRACE, Peer) << "Exception processing validation"; - charge (peer, Resource::feeInvalidRequest); - } - } + /** Called when we receive tx set data. */ + static + void + peerTXData (Job&, std::weak_ptr wPeer, uint256 const& hash, + std::shared_ptr pPacket, + beast::Journal journal); }; //------------------------------------------------------------------------------ @@ -1165,7 +518,9 @@ const boost::posix_time::seconds PeerImp::nodeVerifySeconds (15); // to_string should not be used we should just use lexical_cast maybe -inline std::string to_string (PeerImp const& peer) +inline +std::string +to_string (PeerImp const& peer) { if (peer.isInCluster()) return peer.getClusterNodeName(); @@ -1173,51 +528,61 @@ inline std::string to_string (PeerImp const& peer) return peer.getRemoteAddress().to_string(); } -inline std::string to_string (PeerImp const* peer) +inline +std::string +to_string (PeerImp const* peer) { return to_string (*peer); } -inline std::ostream& operator<< (std::ostream& os, PeerImp const& peer) +inline +std::ostream& +operator<< (std::ostream& os, PeerImp const& peer) { os << to_string (peer); return os; } -inline std::ostream& operator<< (std::ostream& os, PeerImp const* peer) +inline +std::ostream& +operator<< (std::ostream& os, PeerImp const* peer) { os << to_string (peer); - return os; } //------------------------------------------------------------------------------ -inline std::string to_string (Peer const& peer) +inline +std::string +to_string (Peer const& peer) { if (peer.isInCluster()) return peer.getClusterNodeName(); - return peer.getRemoteAddress().to_string(); } -inline std::string to_string (Peer const* peer) +inline +std::string +to_string (Peer const* peer) { return to_string (*peer); } -inline std::ostream& operator<< (std::ostream& os, Peer const& peer) +inline +std::ostream& +operator<< (std::ostream& os, Peer const& peer) { os << to_string (peer); - return os; } -inline std::ostream& operator<< (std::ostream& os, Peer const* peer) +inline +std::ostream& +operator<< (std::ostream& os, Peer const* peer) { os << to_string (peer); - return os; } diff --git a/src/ripple/peerfinder/Callback.h b/src/ripple/peerfinder/Callback.h deleted file mode 100644 index 40c51398b0..0000000000 --- a/src/ripple/peerfinder/Callback.h +++ /dev/null @@ -1,54 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_PEERFINDER_CALLBACK_H_INCLUDED -#define RIPPLE_PEERFINDER_CALLBACK_H_INCLUDED - -#include -#include - -namespace ripple { -namespace PeerFinder { - -/** The Callback receives PeerFinder notifications. - The notifications are sent on a thread owned by the PeerFinder, - so it is best not to do too much work in here. Just post functor - to another worker thread or job queue and return. -*/ -struct Callback -{ - /** Initiate outgoing Peer connections to the specified set of endpoints. */ - virtual void connect (IPAddresses const& addresses) = 0; - - /** Activate the handshaked peer with the specified address. */ - virtual void activate (Slot::ptr const& slot) = 0; - - /** Sends a set of Endpoint records to the specified peer. */ - virtual void send (Slot::ptr const& slot, Endpoints const& endpoints) = 0; - - /** Disconnect the handshaked peer with the specified address. - @param graceful `true` to wait for send buffers to drain before closing. - */ - virtual void disconnect (Slot::ptr const& slot, bool graceful) = 0; -}; - -} -} - -#endif diff --git a/src/ripple/peerfinder/Config.h b/src/ripple/peerfinder/Config.h deleted file mode 100644 index de325162f7..0000000000 --- a/src/ripple/peerfinder/Config.h +++ /dev/null @@ -1,77 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_PEERFINDER_CONFIG_H_INCLUDED -#define RIPPLE_PEERFINDER_CONFIG_H_INCLUDED - -namespace ripple { -namespace PeerFinder { - -/** PeerFinder configuration settings. */ -struct Config -{ - /** The largest number of public peer slots to allow. - This includes both inbound and outbound, but does not include - fixed peers. - */ - int maxPeers; - - /** The number of automatic outbound connections to maintain. - Outbound connections are only maintained if autoConnect - is `true`. The value can be fractional; The decision to round up - or down will be made using a per-process pseudorandom number and - a probability proportional to the fractional part. - Example: - If outPeers is 9.3, then 30% of nodes will maintain 9 outbound - connections, while 70% of nodes will maintain 10 outbound - connections. - */ - double outPeers; - - /** `true` if we want to accept incoming connections. */ - bool wantIncoming; - - /** `true` if we want to establish connections automatically */ - bool autoConnect; - - /** The listening port number. */ - std::uint16_t listeningPort; - - /** The set of features we advertise. */ - std::string features; - - //-------------------------------------------------------------------------- - - /** Create a configuration with default values. */ - Config (); - - /** Returns a suitable value for outPeers according to the rules. */ - double calcOutPeers () const; - - /** Adjusts the values so they follow the business rules. */ - void applyTuning (); - - /** Write the configuration into a property stream */ - void onWrite (beast::PropertyStream::Map& map); -}; - -} -} - -#endif diff --git a/src/ripple/peerfinder/Endpoint.h b/src/ripple/peerfinder/Endpoint.h deleted file mode 100644 index 6f613330c9..0000000000 --- a/src/ripple/peerfinder/Endpoint.h +++ /dev/null @@ -1,44 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_PEERFINDER_ENDPOINT_H_INCLUDED -#define RIPPLE_PEERFINDER_ENDPOINT_H_INCLUDED - -#include - -namespace ripple { -namespace PeerFinder { - -/** Describes a connectible peer address along with some metadata. */ -struct Endpoint -{ - Endpoint (); - - Endpoint (beast::IP::Endpoint const& ep, int hops_); - - int hops; - beast::IP::Endpoint address; -}; - -bool operator< (Endpoint const& lhs, Endpoint const& rhs); - -} // namespace PeerFinder -} // namespace ripple - -#endif diff --git a/src/ripple/peerfinder/Manager.h b/src/ripple/peerfinder/Manager.h index 9bc815297a..5d72c5f5be 100644 --- a/src/ripple/peerfinder/Manager.h +++ b/src/ripple/peerfinder/Manager.h @@ -20,16 +20,122 @@ #ifndef RIPPLE_PEERFINDER_MANAGER_H_INCLUDED #define RIPPLE_PEERFINDER_MANAGER_H_INCLUDED -#include -#include #include -#include #include +#include #include namespace ripple { namespace PeerFinder { +typedef beast::abstract_clock clock_type; + +/** Represents a set of addresses. */ +typedef std::vector IPAddresses; + +//------------------------------------------------------------------------------ + +/** PeerFinder configuration settings. */ +struct Config +{ + /** The largest number of public peer slots to allow. + This includes both inbound and outbound, but does not include + fixed peers. + */ + int maxPeers; + + /** The number of automatic outbound connections to maintain. + Outbound connections are only maintained if autoConnect + is `true`. The value can be fractional; The decision to round up + or down will be made using a per-process pseudorandom number and + a probability proportional to the fractional part. + Example: + If outPeers is 9.3, then 30% of nodes will maintain 9 outbound + connections, while 70% of nodes will maintain 10 outbound + connections. + */ + double outPeers; + + /** `true` if we want to accept incoming connections. */ + bool wantIncoming; + + /** `true` if we want to establish connections automatically */ + bool autoConnect; + + /** The listening port number. */ + std::uint16_t listeningPort; + + /** The set of features we advertise. */ + std::string features; + + //-------------------------------------------------------------------------- + + /** Create a configuration with default values. */ + Config (); + + /** Returns a suitable value for outPeers according to the rules. */ + double calcOutPeers () const; + + /** Adjusts the values so they follow the business rules. */ + void applyTuning (); + + /** Write the configuration into a property stream */ + void onWrite (beast::PropertyStream::Map& map); +}; + +//------------------------------------------------------------------------------ + +/** Describes a connectible peer address along with some metadata. */ +struct Endpoint +{ + Endpoint (); + + Endpoint (beast::IP::Endpoint const& ep, int hops_); + + int hops; + beast::IP::Endpoint address; +}; + +bool operator< (Endpoint const& lhs, Endpoint const& rhs); + +/** A set of Endpoint used for connecting. */ +typedef std::vector Endpoints; + +//------------------------------------------------------------------------------ + +/** The Callback receives PeerFinder notifications. + The notifications are sent on a thread owned by the PeerFinder, + so it is best not to do too much work in here. Just post functor + to another worker thread or job queue and return. +*/ +// DEPRECATED Callbacks only cause re-entrancy pain +struct Callback +{ + /** Initiate outgoing Peer connections to the specified set of endpoints. */ + virtual void connect (IPAddresses const& addresses) = 0; + + /** Activate the handshaked peer with the specified address. */ + virtual void activate (Slot::ptr const& slot) = 0; + + /** Sends a set of Endpoint records to the specified peer. */ + virtual void send (Slot::ptr const& slot, Endpoints const& endpoints) = 0; + + /** Disconnect the handshaked peer with the specified address. + @param graceful `true` to wait for send buffers to drain before closing. + */ + virtual void disconnect (Slot::ptr const& slot, bool graceful) = 0; +}; + +//------------------------------------------------------------------------------ + +/** Possible results from activating a slot. */ +enum class Result +{ + duplicate, + full, + success +}; + /** Maintains a set of IP addresses used for getting into the network. */ class Manager : public beast::Stoppable diff --git a/src/ripple/peerfinder/Types.h b/src/ripple/peerfinder/Types.h deleted file mode 100644 index 01eb7fc1ac..0000000000 --- a/src/ripple/peerfinder/Types.h +++ /dev/null @@ -1,41 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_PEERFINDER_TYPES_H_INCLUDED -#define RIPPLE_PEERFINDER_TYPES_H_INCLUDED - -#include - -#include - -namespace ripple { -namespace PeerFinder { - -/** Represents a set of addresses. */ -typedef std::vector IPAddresses; - -/** A set of Endpoint used for connecting. */ -typedef std::vector Endpoints; - -typedef beast::abstract_clock clock_type; - -} -} - -#endif diff --git a/src/ripple/peerfinder/impl/Bootcache.h b/src/ripple/peerfinder/impl/Bootcache.h index 0101a682c4..3158386277 100644 --- a/src/ripple/peerfinder/impl/Bootcache.h +++ b/src/ripple/peerfinder/impl/Bootcache.h @@ -20,7 +20,7 @@ #ifndef RIPPLE_PEERFINDER_BOOTCACHE_H_INCLUDED #define RIPPLE_PEERFINDER_BOOTCACHE_H_INCLUDED -#include +#include #include #include #include diff --git a/src/ripple/peerfinder/impl/Config.cpp b/src/ripple/peerfinder/impl/Config.cpp index a6cbb80366..d46a3260e5 100644 --- a/src/ripple/peerfinder/impl/Config.cpp +++ b/src/ripple/peerfinder/impl/Config.cpp @@ -17,7 +17,7 @@ */ //============================================================================== -#include +#include namespace ripple { namespace PeerFinder { diff --git a/src/ripple/peerfinder/impl/ConnectHandouts.cpp b/src/ripple/peerfinder/impl/ConnectHandouts.cpp deleted file mode 100644 index 78f83c2565..0000000000 --- a/src/ripple/peerfinder/impl/ConnectHandouts.cpp +++ /dev/null @@ -1,64 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#include - -namespace ripple { -namespace PeerFinder { - -ConnectHandouts::ConnectHandouts ( - std::size_t needed, Squelches& squelches) - : m_needed (needed) - , m_squelches (squelches) -{ - m_list.reserve (needed); -} - -bool -ConnectHandouts::try_insert (beast::IP::Endpoint const& endpoint) -{ - if (full ()) - return false; - - // Make sure the address isn't already in our list - if (std::any_of (m_list.begin(), m_list.end(), - [&endpoint](beast::IP::Endpoint const& other) - { - // Ignore port for security reasons - return other.address() == - endpoint.address(); - })) - { - return false; - } - - // Add to squelch list so we don't try it too often. - // If its already there, then make try_insert fail. - auto const result (m_squelches.insert ( - endpoint.address())); - if (! result.second) - return false; - - m_list.push_back (endpoint); - - return true; -} - -} -} diff --git a/src/ripple/peerfinder/impl/ConnectHandouts.h b/src/ripple/peerfinder/impl/ConnectHandouts.h deleted file mode 100644 index 642f14e7e8..0000000000 --- a/src/ripple/peerfinder/impl/ConnectHandouts.h +++ /dev/null @@ -1,79 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_PEERFINDER_CONNECTHANDOUTS_H_INCLUDED -#define RIPPLE_PEERFINDER_CONNECTHANDOUTS_H_INCLUDED - -#include - -#include - -namespace ripple { -namespace PeerFinder { - -/** Receives handouts for making automatic connections. */ -class ConnectHandouts -{ -public: - // Keeps track of addresses we have made outgoing connections - // to, for the purposes of not connecting to them too frequently. - typedef beast::aged_set Squelches; - - typedef std::vector list_type; - -private: - std::size_t m_needed; - Squelches& m_squelches; - list_type m_list; - -public: - ConnectHandouts (std::size_t needed, Squelches& squelches); - - bool empty() const - { - return m_list.empty(); - } - - bool full() const - { - return m_list.size() >= m_needed; - } - - bool try_insert (Endpoint const& endpoint) - { - return try_insert (endpoint.address); - } - - list_type& list() - { - return m_list; - } - - list_type const& list() const - { - return m_list; - } - - bool try_insert (beast::IP::Endpoint const& endpoint); -}; - -} -} - -#endif diff --git a/src/ripple/peerfinder/impl/Counts.h b/src/ripple/peerfinder/impl/Counts.h index a878f6a554..27afc131ed 100644 --- a/src/ripple/peerfinder/impl/Counts.h +++ b/src/ripple/peerfinder/impl/Counts.h @@ -20,7 +20,7 @@ #ifndef RIPPLE_PEERFINDER_COUNTS_H_INCLUDED #define RIPPLE_PEERFINDER_COUNTS_H_INCLUDED -#include +#include #include namespace ripple { diff --git a/src/ripple/peerfinder/impl/Endpoint.cpp b/src/ripple/peerfinder/impl/Endpoint.cpp index 163d5307ed..07acd8c9af 100644 --- a/src/ripple/peerfinder/impl/Endpoint.cpp +++ b/src/ripple/peerfinder/impl/Endpoint.cpp @@ -17,7 +17,7 @@ */ //============================================================================== -#include +#include namespace ripple { namespace PeerFinder { diff --git a/src/ripple/peerfinder/impl/Handouts.h b/src/ripple/peerfinder/impl/Handouts.h new file mode 100644 index 0000000000..132d3ff5d6 --- /dev/null +++ b/src/ripple/peerfinder/impl/Handouts.h @@ -0,0 +1,352 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_PEERFINDER_HANDOUTS_H_INCLUDED +#define RIPPLE_PEERFINDER_HANDOUTS_H_INCLUDED + +#include +#include +#include +#include +#include +#include // + +namespace ripple { +namespace PeerFinder { + +namespace detail { + +/** Try to insert one object in the target. + When an item is handed out it is moved to the end of the container. + @return The number of objects inserted +*/ +// VFALCO TODO specialization that handles std::list for SequenceContainer +// using splice for optimization over erase/push_back +// +template +std::size_t +handout_one (Target& t, HopContainer& h) +{ + assert (! t.full()); + for (auto it = h.begin(); it != h.end(); ++it) + { + auto const& e = *it; + if (t.try_insert (e)) + { + h.move_back (it); + return 1; + } + } + return 0; +} + +} + +/** Distributes objects to targets according to business rules. + A best effort is made to evenly distribute items in the sequence + container list into the target sequence list. +*/ +template +void +handout (TargetFwdIter first, TargetFwdIter last, + SeqFwdIter seq_first, SeqFwdIter seq_last) +{ + for (;;) + { + std::size_t n (0); + for (auto si = seq_first; si != seq_last; ++si) + { + auto c = *si; + bool all_full (true); + for (auto ti = first; ti != last; ++ti) + { + auto& t = *ti; + if (! t.full()) + { + n += detail::handout_one (t, c); + all_full = false; + } + } + if (all_full) + return; + } + if (! n) + break; + } +} + +//------------------------------------------------------------------------------ + +/** Receives handouts for redirecting a connection. + An incoming connection request is redirected when we are full on slots. +*/ +class RedirectHandouts +{ +public: + template + explicit + RedirectHandouts (SlotImp::ptr const& slot); + + template + bool try_insert (Endpoint const& ep); + + bool full () const + { + return list_.size() >= Tuning::redirectEndpointCount; + } + + SlotImp::ptr const& slot () const + { + return slot_; + } + + std::vector const& list() const + { + return list_; + } + +private: + SlotImp::ptr slot_; + std::vector list_; +}; + +template +RedirectHandouts::RedirectHandouts (SlotImp::ptr const& slot) + : slot_ (slot) +{ + list_.reserve (Tuning::redirectEndpointCount); +} + +template +bool +RedirectHandouts::try_insert (Endpoint const& ep) +{ + if (full ()) + return false; + + // VFALCO NOTE This check can be removed when we provide the + // addresses in a peer HTTP handshake instead of + // the tmENDPOINTS message. + // + if (ep.hops > Tuning::maxHops) + return false; + + // Don't send them our address + if (ep.hops == 0) + return false; + + // Don't send them their own address + if (slot_->remote_endpoint().address() == + ep.address.address()) + return false; + + // Make sure the address isn't already in our list + if (std::any_of (list_.begin(), list_.end(), + [&ep](Endpoint const& other) + { + // Ignore port for security reasons + return other.address.address() == ep.address.address(); + })) + { + return false; + } + + list_.emplace_back (ep.address, ep.hops); + + return true; +} + +//------------------------------------------------------------------------------ + +/** Receives endpoints for a slot during periodic handouts. */ +class SlotHandouts +{ +public: + template + explicit + SlotHandouts (SlotImp::ptr const& slot); + + template + bool try_insert (Endpoint const& ep); + + bool full () const + { + return list_.size() >= Tuning::numberOfEndpoints; + } + + void insert (Endpoint const& ep) + { + list_.push_back (ep); + } + + SlotImp::ptr const& slot () const + { + return slot_; + } + + std::vector const& list() const + { + return list_; + } + +private: + SlotImp::ptr slot_; + std::vector list_; +}; + +template +SlotHandouts::SlotHandouts (SlotImp::ptr const& slot) + : slot_ (slot) +{ + list_.reserve (Tuning::numberOfEndpoints); +} + +template +bool +SlotHandouts::try_insert (Endpoint const& ep) +{ + if (full ()) + return false; + + if (ep.hops > Tuning::maxHops) + return false; + + if (slot_->recent.filter (ep.address, ep.hops)) + return false; + + // Don't send them their own address + if (slot_->remote_endpoint().address() == + ep.address.address()) + return false; + + // Make sure the address isn't already in our list + if (std::any_of (list_.begin(), list_.end(), + [&ep](Endpoint const& other) + { + // Ignore port for security reasons + return other.address.address() == ep.address.address(); + })) + return false; + + list_.emplace_back (ep.address, ep.hops); + + // Insert into this slot's recent table. Although the endpoint + // didn't come from the slot, adding it to the slot's table + // prevents us from sending it again until it has expired from + // the other end's cache. + // + slot_->recent.insert (ep.address, ep.hops); + + return true; +} + +//------------------------------------------------------------------------------ + +/** Receives handouts for making automatic connections. */ +class ConnectHandouts +{ +public: + // Keeps track of addresses we have made outgoing connections + // to, for the purposes of not connecting to them too frequently. + typedef beast::aged_set Squelches; + + typedef std::vector list_type; + +private: + std::size_t m_needed; + Squelches& m_squelches; + list_type m_list; + +public: + template + ConnectHandouts (std::size_t needed, Squelches& squelches); + + template + bool try_insert (beast::IP::Endpoint const& endpoint); + + bool empty() const + { + return m_list.empty(); + } + + bool full() const + { + return m_list.size() >= m_needed; + } + + bool try_insert (Endpoint const& endpoint) + { + return try_insert (endpoint.address); + } + + list_type& list() + { + return m_list; + } + + list_type const& list() const + { + return m_list; + } +}; + +template +ConnectHandouts::ConnectHandouts ( + std::size_t needed, Squelches& squelches) + : m_needed (needed) + , m_squelches (squelches) +{ + m_list.reserve (needed); +} + +template +bool +ConnectHandouts::try_insert (beast::IP::Endpoint const& endpoint) +{ + if (full ()) + return false; + + // Make sure the address isn't already in our list + if (std::any_of (m_list.begin(), m_list.end(), + [&endpoint](beast::IP::Endpoint const& other) + { + // Ignore port for security reasons + return other.address() == + endpoint.address(); + })) + { + return false; + } + + // Add to squelch list so we don't try it too often. + // If its already there, then make try_insert fail. + auto const result (m_squelches.insert ( + endpoint.address())); + if (! result.second) + return false; + + m_list.push_back (endpoint); + + return true; +} + +} +} + +#endif diff --git a/src/ripple/peerfinder/impl/Livecache.h b/src/ripple/peerfinder/impl/Livecache.h index 181241d9b7..aa44b9bfae 100644 --- a/src/ripple/peerfinder/impl/Livecache.h +++ b/src/ripple/peerfinder/impl/Livecache.h @@ -20,8 +20,7 @@ #ifndef RIPPLE_PEERFINDER_LIVECACHE_H_INCLUDED #define RIPPLE_PEERFINDER_LIVECACHE_H_INCLUDED -#include -#include +#include #include #include #include diff --git a/src/ripple/peerfinder/impl/Logic.h b/src/ripple/peerfinder/impl/Logic.h index 448bb01a64..57cde0e07b 100644 --- a/src/ripple/peerfinder/impl/Logic.h +++ b/src/ripple/peerfinder/impl/Logic.h @@ -20,15 +20,12 @@ #ifndef RIPPLE_PEERFINDER_LOGIC_H_INCLUDED #define RIPPLE_PEERFINDER_LOGIC_H_INCLUDED -#include -#include +#include #include #include -#include #include -#include +#include #include -#include #include #include diff --git a/src/ripple/peerfinder/impl/RedirectHandouts.cpp b/src/ripple/peerfinder/impl/RedirectHandouts.cpp deleted file mode 100644 index b163aaebef..0000000000 --- a/src/ripple/peerfinder/impl/RedirectHandouts.cpp +++ /dev/null @@ -1,70 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#include - -namespace ripple { -namespace PeerFinder { - -RedirectHandouts::RedirectHandouts (SlotImp::ptr const& slot) - : m_slot (slot) -{ - m_list.reserve (Tuning::redirectEndpointCount); -} - -bool -RedirectHandouts::try_insert (Endpoint const& ep) -{ - if (full ()) - return false; - - // VFALCO NOTE This check can be removed when we provide the - // addresses in a peer HTTP handshake instead of - // the tmENDPOINTS message. - // - if (ep.hops > Tuning::maxHops) - return false; - - // Don't send them our address - if (ep.hops == 0) - return false; - - // Don't send them their own address - if (m_slot->remote_endpoint().address() == - ep.address.address()) - return false; - - // Make sure the address isn't already in our list - if (std::any_of (m_list.begin(), m_list.end(), - [&ep](Endpoint const& other) - { - // Ignore port for security reasons - return other.address.address() == ep.address.address(); - })) - { - return false; - } - - m_list.emplace_back (ep.address, ep.hops); - - return true; -} - -} -} diff --git a/src/ripple/peerfinder/impl/RedirectHandouts.h b/src/ripple/peerfinder/impl/RedirectHandouts.h deleted file mode 100644 index be645469fd..0000000000 --- a/src/ripple/peerfinder/impl/RedirectHandouts.h +++ /dev/null @@ -1,62 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_PEERFINDER_REDIRECTHANDOUTS_H_INCLUDED -#define RIPPLE_PEERFINDER_REDIRECTHANDOUTS_H_INCLUDED - -#include -#include - -namespace ripple { -namespace PeerFinder { - -/** Receives handouts for redirecting a connection. - An incoming connection request is redirected when we are full on slots. -*/ -class RedirectHandouts -{ -public: - RedirectHandouts (SlotImp::ptr const& slot); - - bool full () const - { - return m_list.size() >= Tuning::redirectEndpointCount; - } - - SlotImp::ptr const& slot () const - { - return m_slot; - } - - std::vector const& list() const - { - return m_list; - } - - bool try_insert (Endpoint const& ep); - -private: - SlotImp::ptr m_slot; - std::vector m_list; -}; - -} -} - -#endif diff --git a/src/ripple/peerfinder/impl/SlotHandouts.cpp b/src/ripple/peerfinder/impl/SlotHandouts.cpp deleted file mode 100644 index 754c800de8..0000000000 --- a/src/ripple/peerfinder/impl/SlotHandouts.cpp +++ /dev/null @@ -1,70 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#include - -namespace ripple { -namespace PeerFinder { - -SlotHandouts::SlotHandouts (SlotImp::ptr const& slot) - : m_slot (slot) -{ - m_list.reserve (Tuning::numberOfEndpoints); -} - -bool -SlotHandouts::try_insert (Endpoint const& ep) -{ - if (full ()) - return false; - - if (ep.hops > Tuning::maxHops) - return false; - - if (m_slot->recent.filter (ep.address, ep.hops)) - return false; - - // Don't send them their own address - if (m_slot->remote_endpoint().address() == - ep.address.address()) - return false; - - // Make sure the address isn't already in our list - if (std::any_of (m_list.begin(), m_list.end(), - [&ep](Endpoint const& other) - { - // Ignore port for security reasons - return other.address.address() == ep.address.address(); - })) - return false; - - m_list.emplace_back (ep.address, ep.hops); - - // Insert into this slot's recent table. Although the endpoint - // didn't come from the slot, adding it to the slot's table - // prevents us from sending it again until it has expired from - // the other end's cache. - // - m_slot->recent.insert (ep.address, ep.hops); - - return true; -} - -} -} diff --git a/src/ripple/peerfinder/impl/SlotHandouts.h b/src/ripple/peerfinder/impl/SlotHandouts.h deleted file mode 100644 index 45b140ccbb..0000000000 --- a/src/ripple/peerfinder/impl/SlotHandouts.h +++ /dev/null @@ -1,65 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_PEERFINDER_SLOTHANDOUTS_H_INCLUDED -#define RIPPLE_PEERFINDER_SLOTHANDOUTS_H_INCLUDED - -#include -#include - -namespace ripple { -namespace PeerFinder { - -/** Functor to receive endpoints for a slot during handout. */ -class SlotHandouts -{ -public: - explicit SlotHandouts (SlotImp::ptr const& slot); - - bool full () const - { - return m_list.size() >= Tuning::numberOfEndpoints; - } - - void insert (Endpoint const& ep) - { - m_list.push_back (ep); - } - - SlotImp::ptr const& slot () const - { - return m_slot; - } - - std::vector const& list() const - { - return m_list; - } - - bool try_insert (Endpoint const& ep); - -private: - SlotImp::ptr m_slot; - std::vector m_list; -}; - -} -} - -#endif diff --git a/src/ripple/peerfinder/impl/handout.h b/src/ripple/peerfinder/impl/handout.h deleted file mode 100644 index 89f14c6912..0000000000 --- a/src/ripple/peerfinder/impl/handout.h +++ /dev/null @@ -1,88 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_PEERFINDER_HANDOUT_H_INCLUDED -#define RIPPLE_PEERFINDER_HANDOUT_H_INCLUDED - -namespace ripple { -namespace PeerFinder { - -namespace detail { - -/** Tries to insert one object in the target. - When an item is handed out it is moved to the end of the container. - @return The number of objects handed out -*/ -// VFALCO TODO specialization that handles std::list for SequenceContainer -// using splice for optimization over erase/push_back -// -template -std::size_t handout_one (Target& t, HopContainer& h) -{ - assert (! t.full()); - for (auto hi (h.begin()); hi != h.end(); ++hi) - { - auto const& e (*hi); - if (t.try_insert (e)) - { - h.move_back (hi); - return 1; - } - } - return 0; -} - -} - -/** Distributes objects to targets according to business rules. - A best effort is made to evenly distribute items in the sequence - container list into the target sequence list. -*/ -template -void handout (TargetFwdIter first, TargetFwdIter last, - SeqFwdIter seq_first, SeqFwdIter seq_last) -{ - for (;;) - { - std::size_t n (0); - for (auto si (seq_first); si != seq_last; ++si) - { - auto c (*si); - bool all_full (true); - for (auto ti (first); ti != last; ++ti) - { - auto& t (*ti); - if (! t.full()) - { - n += detail::handout_one (t, c); - all_full = false; - } - } - if (all_full) - return; - } - if (! n) - break; - } -} - -} -} - -#endif diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index 47aefd6d4b..d58dd17ed6 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -32,6 +32,11 @@ enum MessageType // token, response = give solution to proof of work // token, result = report result of pow +//------------------------------------------------------------------------------ + +/* Requests or responds to a proof of work. + Unimplemented and unused currently. +*/ message TMProofWork { required string token = 1; @@ -52,6 +57,8 @@ message TMProofWork optional PowResult result = 6; } +//------------------------------------------------------------------------------ + // Sent on connect message TMHello { diff --git a/src/ripple/unity/peerfinder.cpp b/src/ripple/unity/peerfinder.cpp index 952862728f..fb1d70b759 100644 --- a/src/ripple/unity/peerfinder.cpp +++ b/src/ripple/unity/peerfinder.cpp @@ -28,12 +28,9 @@ #include #include #include -#include #include #include #include -#include -#include #include #include