PeerFinder work

This commit is contained in:
Vinnie Falco
2013-09-30 09:35:29 -07:00
parent 942336c454
commit ea2589dd9c
3 changed files with 48 additions and 46 deletions

View File

@@ -17,12 +17,13 @@
*/
//==============================================================================
SETUP_LOG (Peer)
class PeerImp;
// Don't try to run past receiving nonsense from a peer
// #define TRUST_NETWORKHEAD
// #define TRUST_NETWORK
// Node has this long to verify its identity from connection accepted or connection attempt.
#define NODE_VERIFY_SECONDS 15
@@ -81,7 +82,7 @@ public:
{
WriteLog (lsDEBUG, Peer) << "CREATING PEER: " << addressToString (this);
}
//---------------------------------------------------------------------------
private:
bool mClientConnect; // In process of connecting as client.
@@ -117,7 +118,7 @@ private:
bool m_remoteAddressSet;
IPEndpoint m_remoteAddress;
public:
static char const* getCountedObjectName () { return "Peer"; }
@@ -206,6 +207,11 @@ public:
return (uMin >= mMinLedger) && (uMax <= mMaxLedger);
}
IPEndpoint getPeerEndpoint() const
{
return m_remoteAddress;
}
private:
void handleShutdown (const boost::system::error_code & error)
{
@@ -294,6 +300,17 @@ private:
}
else
{
boost::asio::ip::address addr (getNativeSocket().remote_endpoint().address());
if (addr.is_v4())
{
boost::asio::ip::address_v4::bytes_type bytes (addr.to_v4().to_bytes());
m_remoteAddress = IPEndpoint (IPEndpoint::V4 (
bytes[0], bytes[1], bytes[2], bytes[3]), 0);
}
m_remoteAddressSet = true;
if (m_socket->getFlags ().set (MultiSocket::Flag::proxy) && m_isInbound)
{
MultiSocket::ProxyInfo const proxyInfo (m_socket->getProxyInfo ());
@@ -464,8 +481,6 @@ void PeerImp::detach (const char* rsn, bool onIOStrand)
{
getApp().getPeers ().peerDisconnected (shared_from_this (), mNodePublic);
getApp().getPeers().getPeerFinder().onPeerDisconnected (RipplePublicKey (mNodePublic));
mNodePublic.clear (); // Be idempotent.
}
@@ -862,10 +877,10 @@ void PeerImp::processReadBuffer ()
if(msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvEndpoints (msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
WriteLog (lsWARNING, Peer) << "parse error: " << type;;
}
break;
case protocol::mtSEARCH_TRANSACTION:
{
event->reName ("Peer::searchtransaction");
@@ -1151,23 +1166,6 @@ void PeerImp::recvHello (protocol::TMHello& packet)
// Consider us connected. No longer accepting mtHELLO.
mHelloed = true;
{
if (! m_remoteAddressSet)
{
boost::asio::ip::address addr (
getNativeSocket().remote_endpoint().address());
if (addr.is_v4())
{
boost::asio::ip::address_v4::bytes_type bytes (addr.to_v4().to_bytes());
m_remoteAddress = IPEndpoint (IPEndpoint::V4 (
bytes[0], bytes[1], bytes[2], bytes[3]), 0);
}
}
getApp().getPeers().getPeerFinder().onPeerConnected (
RipplePublicKey (mNodePublic), m_remoteAddress, m_isInbound);
}
// XXX Set timer: connection is in grace period to be useful.
// XXX Set timer: connection idle (idle may vary depending on connection type.)
@@ -1679,15 +1677,6 @@ void PeerImp::recvEndpoints (protocol::TMEndpoints& packet)
// hops
endpoint.hops = tm.hops();
// slots
endpoint.incomingSlotsAvailable = tm.slots();
// maxSlots
endpoint.incomingSlotsMax = tm.maxslots();
// uptimeMinutes
endpoint.uptimeMinutes = tm.uptimeminutes();
// ipv4
if (endpoint.hops > 0)
{
@@ -1696,8 +1685,6 @@ void PeerImp::recvEndpoints (protocol::TMEndpoints& packet)
IPEndpoint::V4 v4 (ntohl (addr.s_addr));
endpoint.address = IPEndpoint (v4, 0);
endpoint.port = tm.ipv4().ipv4port ();
endpoints.push_back (endpoint);
}
else
{
@@ -1707,13 +1694,21 @@ void PeerImp::recvEndpoints (protocol::TMEndpoints& packet)
// then we'll verify that their listener can receive incoming
// by performing a connectivity test.
//
if (m_remoteAddressSet)
{
endpoint.address = m_remoteAddress.withPort (0);
endpoint.port = tm.ipv4().ipv4port ();
endpoints.push_back (endpoint);
}
bassert (m_remoteAddressSet);
endpoint.address = m_remoteAddress.withPort (0);
endpoint.port = tm.ipv4().ipv4port ();
}
// slots
endpoint.incomingSlotsAvailable = tm.slots();
// maxSlots
endpoint.incomingSlotsMax = tm.maxslots();
// uptimeMinutes
endpoint.uptimeMinutes = tm.uptimeminutes();
endpoints.push_back (endpoint);
}
getApp().getPeers().getPeerFinder().onPeerEndpoints (

View File

@@ -103,6 +103,8 @@ public:
virtual bool hasRange (uint32 uMin, uint32 uMax) = 0;
virtual IPEndpoint getPeerEndpoint() const = 0;
//--------------------------------------------------------------------------
typedef boost::asio::ip::tcp::socket NativeSocketType;

View File

@@ -37,9 +37,10 @@ public:
typedef RippleRecursiveMutex LockType;
typedef LockType::ScopedLockType ScopedLockType;
typedef std::pair<RippleAddress, Peer::pointer> naPeer;
typedef std::pair<IPAndPortNumber, Peer::pointer> pipPeer;
typedef std::pair<RippleAddress, Peer::pointer> naPeer;
typedef std::pair<IPAndPortNumber, Peer::pointer> pipPeer;
typedef std::map<IPAndPortNumber, Peer::pointer>::value_type vtPeer;
typedef boost::unordered_map<RippleAddress, Peer::pointer>::value_type vtConMap;
ScopedPointer <PeerFinder::Manager> m_peerFinder;
@@ -60,7 +61,6 @@ public:
// Non-thin peers which we are connected to.
// PeersImp we have the public key for.
typedef boost::unordered_map<RippleAddress, Peer::pointer>::value_type vtConMap;
boost::unordered_map<RippleAddress, Peer::pointer> mConnectedMap;
// Connections with have a 64-bit identifier
@@ -594,7 +594,7 @@ void PeersImp::relayMessageTo (const std::set<uint64>& fromPeers, const PackedMe
}
// Schedule a connection via scanning.
//
//addr.to_v4().to_bytes()
// Add or modify into PeerIps as a manual entry for immediate scanning.
// Requires sane IP and port.
void PeersImp::connectTo (const std::string& strIp, int iPort)
@@ -718,6 +718,9 @@ bool PeersImp::peerConnected (Peer::ref peer, const RippleAddress& naPeer,
mConnectedMap[naPeer] = peer;
bNew = true;
// Notify peerfinder since this is a connection that we didn't know about and are keeping
getPeerFinder ().onPeerConnected (RipplePublicKey (peer->getNodePublic()), peer->getPeerEndpoint(), peer->isInbound());
assert (peer->getPeerId () != 0);
mPeerIdMap.insert (std::make_pair (peer->getPeerId (), peer));
}
@@ -778,7 +781,9 @@ void PeersImp::peerDisconnected (Peer::ref peer, const RippleAddress& naPeer)
}
else
{
// Found it. Delete it.
// Found it. Notify peerfinder, then delete it.
getPeerFinder ().onPeerDisconnected (RipplePublicKey (itCm->first));
mConnectedMap.erase (itCm);
//WriteLog (lsINFO, Peers) << "Pool: disconnected: " << naPeer.humanNodePublic() << " " << peer->getIP() << " " << peer->getPort();