From ea2589dd9c45b379a2eb5135105b05661a99704a Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Mon, 30 Sep 2013 09:35:29 -0700 Subject: [PATCH] PeerFinder work --- src/ripple_app/peers/Peer.cpp | 77 ++++++++++++++++------------------ src/ripple_app/peers/Peer.h | 2 + src/ripple_app/peers/Peers.cpp | 15 ++++--- 3 files changed, 48 insertions(+), 46 deletions(-) diff --git a/src/ripple_app/peers/Peer.cpp b/src/ripple_app/peers/Peer.cpp index 47b946b0e..b826d707d 100644 --- a/src/ripple_app/peers/Peer.cpp +++ b/src/ripple_app/peers/Peer.cpp @@ -17,12 +17,13 @@ */ //============================================================================== + SETUP_LOG (Peer) class PeerImp; // Don't try to run past receiving nonsense from a peer -// #define TRUST_NETWORKHEAD +// #define TRUST_NETWORK // Node has this long to verify its identity from connection accepted or connection attempt. #define NODE_VERIFY_SECONDS 15 @@ -81,7 +82,7 @@ public: { WriteLog (lsDEBUG, Peer) << "CREATING PEER: " << addressToString (this); } - + //--------------------------------------------------------------------------- private: bool mClientConnect; // In process of connecting as client. @@ -117,7 +118,7 @@ private: bool m_remoteAddressSet; IPEndpoint m_remoteAddress; - + public: static char const* getCountedObjectName () { return "Peer"; } @@ -206,6 +207,11 @@ public: return (uMin >= mMinLedger) && (uMax <= mMaxLedger); } + IPEndpoint getPeerEndpoint() const + { + return m_remoteAddress; + } + private: void handleShutdown (const boost::system::error_code & error) { @@ -294,6 +300,17 @@ private: } else { + boost::asio::ip::address addr (getNativeSocket().remote_endpoint().address()); + + if (addr.is_v4()) + { + boost::asio::ip::address_v4::bytes_type bytes (addr.to_v4().to_bytes()); + m_remoteAddress = IPEndpoint (IPEndpoint::V4 ( + bytes[0], bytes[1], bytes[2], bytes[3]), 0); + } + + m_remoteAddressSet = true; + if (m_socket->getFlags ().set (MultiSocket::Flag::proxy) && m_isInbound) { MultiSocket::ProxyInfo const proxyInfo (m_socket->getProxyInfo ()); @@ -464,8 +481,6 @@ void PeerImp::detach (const char* rsn, bool onIOStrand) { getApp().getPeers ().peerDisconnected (shared_from_this (), mNodePublic); - getApp().getPeers().getPeerFinder().onPeerDisconnected (RipplePublicKey (mNodePublic)); - mNodePublic.clear (); // Be idempotent. } @@ -862,10 +877,10 @@ void PeerImp::processReadBuffer () if(msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvEndpoints (msg); else - WriteLog (lsWARNING, Peer) << "parse error: " << type; + WriteLog (lsWARNING, Peer) << "parse error: " << type;; } break; - + case protocol::mtSEARCH_TRANSACTION: { event->reName ("Peer::searchtransaction"); @@ -1151,23 +1166,6 @@ void PeerImp::recvHello (protocol::TMHello& packet) // Consider us connected. No longer accepting mtHELLO. mHelloed = true; - { - if (! m_remoteAddressSet) - { - boost::asio::ip::address addr ( - getNativeSocket().remote_endpoint().address()); - if (addr.is_v4()) - { - boost::asio::ip::address_v4::bytes_type bytes (addr.to_v4().to_bytes()); - m_remoteAddress = IPEndpoint (IPEndpoint::V4 ( - bytes[0], bytes[1], bytes[2], bytes[3]), 0); - } - } - - getApp().getPeers().getPeerFinder().onPeerConnected ( - RipplePublicKey (mNodePublic), m_remoteAddress, m_isInbound); - } - // XXX Set timer: connection is in grace period to be useful. // XXX Set timer: connection idle (idle may vary depending on connection type.) @@ -1679,15 +1677,6 @@ void PeerImp::recvEndpoints (protocol::TMEndpoints& packet) // hops endpoint.hops = tm.hops(); - // slots - endpoint.incomingSlotsAvailable = tm.slots(); - - // maxSlots - endpoint.incomingSlotsMax = tm.maxslots(); - - // uptimeMinutes - endpoint.uptimeMinutes = tm.uptimeminutes(); - // ipv4 if (endpoint.hops > 0) { @@ -1696,8 +1685,6 @@ void PeerImp::recvEndpoints (protocol::TMEndpoints& packet) IPEndpoint::V4 v4 (ntohl (addr.s_addr)); endpoint.address = IPEndpoint (v4, 0); endpoint.port = tm.ipv4().ipv4port (); - - endpoints.push_back (endpoint); } else { @@ -1707,13 +1694,21 @@ void PeerImp::recvEndpoints (protocol::TMEndpoints& packet) // then we'll verify that their listener can receive incoming // by performing a connectivity test. // - if (m_remoteAddressSet) - { - endpoint.address = m_remoteAddress.withPort (0); - endpoint.port = tm.ipv4().ipv4port (); - endpoints.push_back (endpoint); - } + bassert (m_remoteAddressSet); + endpoint.address = m_remoteAddress.withPort (0); + endpoint.port = tm.ipv4().ipv4port (); } + + // slots + endpoint.incomingSlotsAvailable = tm.slots(); + + // maxSlots + endpoint.incomingSlotsMax = tm.maxslots(); + + // uptimeMinutes + endpoint.uptimeMinutes = tm.uptimeminutes(); + + endpoints.push_back (endpoint); } getApp().getPeers().getPeerFinder().onPeerEndpoints ( diff --git a/src/ripple_app/peers/Peer.h b/src/ripple_app/peers/Peer.h index 775276992..acedb07d6 100644 --- a/src/ripple_app/peers/Peer.h +++ b/src/ripple_app/peers/Peer.h @@ -103,6 +103,8 @@ public: virtual bool hasRange (uint32 uMin, uint32 uMax) = 0; + virtual IPEndpoint getPeerEndpoint() const = 0; + //-------------------------------------------------------------------------- typedef boost::asio::ip::tcp::socket NativeSocketType; diff --git a/src/ripple_app/peers/Peers.cpp b/src/ripple_app/peers/Peers.cpp index 8a007fe56..005392fe0 100644 --- a/src/ripple_app/peers/Peers.cpp +++ b/src/ripple_app/peers/Peers.cpp @@ -37,9 +37,10 @@ public: typedef RippleRecursiveMutex LockType; typedef LockType::ScopedLockType ScopedLockType; - typedef std::pair naPeer; - typedef std::pair pipPeer; + typedef std::pair naPeer; + typedef std::pair pipPeer; typedef std::map::value_type vtPeer; + typedef boost::unordered_map::value_type vtConMap; ScopedPointer m_peerFinder; @@ -60,7 +61,6 @@ public: // Non-thin peers which we are connected to. // PeersImp we have the public key for. - typedef boost::unordered_map::value_type vtConMap; boost::unordered_map mConnectedMap; // Connections with have a 64-bit identifier @@ -594,7 +594,7 @@ void PeersImp::relayMessageTo (const std::set& fromPeers, const PackedMe } // Schedule a connection via scanning. -// +//addr.to_v4().to_bytes() // Add or modify into PeerIps as a manual entry for immediate scanning. // Requires sane IP and port. void PeersImp::connectTo (const std::string& strIp, int iPort) @@ -718,6 +718,9 @@ bool PeersImp::peerConnected (Peer::ref peer, const RippleAddress& naPeer, mConnectedMap[naPeer] = peer; bNew = true; + // Notify peerfinder since this is a connection that we didn't know about and are keeping + getPeerFinder ().onPeerConnected (RipplePublicKey (peer->getNodePublic()), peer->getPeerEndpoint(), peer->isInbound()); + assert (peer->getPeerId () != 0); mPeerIdMap.insert (std::make_pair (peer->getPeerId (), peer)); } @@ -778,7 +781,9 @@ void PeersImp::peerDisconnected (Peer::ref peer, const RippleAddress& naPeer) } else { - // Found it. Delete it. + // Found it. Notify peerfinder, then delete it. + getPeerFinder ().onPeerDisconnected (RipplePublicKey (itCm->first)); + mConnectedMap.erase (itCm); //WriteLog (lsINFO, Peers) << "Pool: disconnected: " << naPeer.humanNodePublic() << " " << peer->getIP() << " " << peer->getPort();