PeerFinder work

This commit is contained in:
Vinnie Falco
2013-09-30 09:35:29 -07:00
parent 41879d8511
commit 4fe63f9f0d
44 changed files with 2327 additions and 489 deletions

View File

@@ -17,13 +17,12 @@
*/
//==============================================================================
SETUP_LOG (Peer)
class PeerImp;
// Don't try to run past receiving nonsense from a peer
// #define TRUST_NETWORK
// #define TRUST_NETWORKHEAD
// Node has this long to verify its identity from connection accepted or connection attempt.
#define NODE_VERIFY_SECONDS 15
@@ -41,11 +40,6 @@ private:
public:
//---------------------------------------------------------------------------
//
//
//
ScopedPointer <MultiSocket> m_socket;
boost::asio::io_service::strand m_strand;
@@ -64,12 +58,6 @@ public:
return *m_socket;
}
//
//
//
//---------------------------------------------------------------------------
public:
PeerImp (boost::asio::io_service& io_service,
boost::asio::ssl::context& ssl_context,
uint64 peerID,
@@ -89,6 +77,7 @@ public:
, mMinLedger (0)
, mMaxLedger (0)
, mActivityTimer (io_service)
, m_remoteAddressSet (false)
{
WriteLog (lsDEBUG, Peer) << "CREATING PEER: " << addressToString (this);
}
@@ -126,6 +115,9 @@ private:
protocol::TMStatusChange mLastStatus;
protocol::TMHello mHello;
bool m_remoteAddressSet;
IPEndpoint m_remoteAddress;
public:
static char const* getCountedObjectName () { return "Peer"; }
@@ -308,6 +300,14 @@ private:
if (proxyInfo.protocol == "TCP4")
{
m_remoteAddressSet = true;
m_remoteAddress = IPEndpoint (IPEndpoint::V4 (
proxyInfo.sourceAddress.value [0],
proxyInfo.sourceAddress.value [1],
proxyInfo.sourceAddress.value [2],
proxyInfo.sourceAddress.value [3]),
proxyInfo.sourcePort);
// Set remote IP and port number from PROXY handshake
mIpPort.first = proxyInfo.sourceAddress.toString ().toStdString ();
mIpPort.second = proxyInfo.sourcePort;
@@ -362,6 +362,7 @@ private:
void recvGetContacts (protocol::TMGetContacts & packet);
void recvGetPeers (protocol::TMGetPeers & packet, Application::ScopedLockType& masterLockHolder);
void recvPeers (protocol::TMPeers & packet);
void recvEndpoints (protocol::TMEndpoints & packet);
void recvGetObjectByHash (const boost::shared_ptr<protocol::TMGetObjectByHash>& packet);
void recvPing (protocol::TMPing & packet);
void recvErrorMessage (protocol::TMErrorMsg & packet);
@@ -390,7 +391,7 @@ void PeerImp::handleWrite (const boost::system::error_code& error, size_t bytes_
// Call on IO strand
#ifdef BEAST_DEBUG
// if (!error)
// Log::out() << "PeerImp::handleWrite bytes: "<< bytes_transferred;
// Log::out() << "Peer::handleWrite bytes: "<< bytes_transferred;
#endif
mSendingPacket.reset ();
@@ -463,6 +464,8 @@ void PeerImp::detach (const char* rsn, bool onIOStrand)
{
getApp().getPeers ().peerDisconnected (shared_from_this (), mNodePublic);
getApp().getPeers().getPeerFinder().onPeerDisconnected (RipplePublicKey (mNodePublic));
mNodePublic.clear (); // Be idempotent.
}
@@ -739,9 +742,9 @@ void PeerImp::processReadBuffer ()
// Log::out() << "PRB(" << type << "), len=" << (mReadbuf.size()-PackedMessage::kHeaderBytes);
#endif
// Log::out() << "PeerImp::processReadBuffer: " << mIpPort.first << " " << mIpPort.second;
// Log::out() << "Peer::processReadBuffer: " << mIpPort.first << " " << mIpPort.second;
LoadEvent::autoptr event (getApp().getJobQueue ().getLoadEventAP (jtPEER, "PeerImp::read"));
LoadEvent::autoptr event (getApp().getJobQueue ().getLoadEventAP (jtPEER, "Peer::read"));
{
Application::ScopedLockType lock (getApp ().getMasterLock (), __FILE__, __LINE__);
@@ -758,7 +761,7 @@ void PeerImp::processReadBuffer ()
{
case protocol::mtHELLO:
{
event->reName ("PeerImp::hello");
event->reName ("Peer::hello");
protocol::TMHello msg;
if (msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -770,7 +773,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtCLUSTER:
{
event->reName ("PeerImp::cluster");
event->reName ("Peer::cluster");
protocol::TMCluster msg;
if (msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -781,7 +784,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtERROR_MSG:
{
event->reName ("PeerImp::errormessage");
event->reName ("Peer::errormessage");
protocol::TMErrorMsg msg;
if (msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -793,7 +796,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtPING:
{
event->reName ("PeerImp::ping");
event->reName ("Peer::ping");
protocol::TMPing msg;
if (msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -805,7 +808,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtGET_CONTACTS:
{
event->reName ("PeerImp::getcontacts");
event->reName ("Peer::getcontacts");
protocol::TMGetContacts msg;
if (msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -817,7 +820,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtCONTACT:
{
event->reName ("PeerImp::contact");
event->reName ("Peer::contact");
protocol::TMContact msg;
if (msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -829,7 +832,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtGET_PEERS:
{
event->reName ("PeerImp::getpeers");
event->reName ("Peer::getpeers");
protocol::TMGetPeers msg;
if (msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -841,7 +844,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtPEERS:
{
event->reName ("PeerImp::peers");
event->reName ("Peer::peers");
protocol::TMPeers msg;
if (msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -851,9 +854,21 @@ void PeerImp::processReadBuffer ()
}
break;
case protocol::mtENDPOINTS:
{
event->reName ("Peer::endpoints");
protocol::TMEndpoints msg;
if(msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvEndpoints (msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
}
break;
case protocol::mtSEARCH_TRANSACTION:
{
event->reName ("PeerImp::searchtransaction");
event->reName ("Peer::searchtransaction");
protocol::TMSearchTransaction msg;
if (msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -865,7 +880,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtGET_ACCOUNT:
{
event->reName ("PeerImp::getaccount");
event->reName ("Peer::getaccount");
protocol::TMGetAccount msg;
if (msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -877,7 +892,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtACCOUNT:
{
event->reName ("PeerImp::account");
event->reName ("Peer::account");
protocol::TMAccount msg;
if (msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -889,7 +904,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtTRANSACTION:
{
event->reName ("PeerImp::transaction");
event->reName ("Peer::transaction");
protocol::TMTransaction msg;
if (msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -901,7 +916,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtSTATUS_CHANGE:
{
event->reName ("PeerImp::statuschange");
event->reName ("Peer::statuschange");
protocol::TMStatusChange msg;
if (msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -913,7 +928,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtPROPOSE_LEDGER:
{
event->reName ("PeerImp::propose");
event->reName ("Peer::propose");
boost::shared_ptr<protocol::TMProposeSet> msg = boost::make_shared<protocol::TMProposeSet> ();
if (msg->ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -925,7 +940,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtGET_LEDGER:
{
event->reName ("PeerImp::getledger");
event->reName ("Peer::getledger");
protocol::TMGetLedger msg;
if (msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -937,7 +952,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtLEDGER_DATA:
{
event->reName ("PeerImp::ledgerdata");
event->reName ("Peer::ledgerdata");
boost::shared_ptr<protocol::TMLedgerData> msg = boost::make_shared<protocol::TMLedgerData> ();
if (msg->ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -949,7 +964,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtHAVE_SET:
{
event->reName ("PeerImp::haveset");
event->reName ("Peer::haveset");
protocol::TMHaveTransactionSet msg;
if (msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -961,7 +976,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtVALIDATION:
{
event->reName ("PeerImp::validation");
event->reName ("Peer::validation");
boost::shared_ptr<protocol::TMValidation> msg = boost::make_shared<protocol::TMValidation> ();
if (msg->ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -987,7 +1002,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtGET_OBJECTS:
{
event->reName ("PeerImp::getobjects");
event->reName ("Peer::getobjects");
boost::shared_ptr<protocol::TMGetObjectByHash> msg = boost::make_shared<protocol::TMGetObjectByHash> ();
if (msg->ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -999,7 +1014,7 @@ void PeerImp::processReadBuffer ()
case protocol::mtPROOFOFWORK:
{
event->reName ("PeerImp::proofofwork");
event->reName ("Peer::proofofwork");
protocol::TMProofWork msg;
if (msg.ParseFromArray (&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size () - PackedMessage::kHeaderBytes))
@@ -1011,7 +1026,7 @@ void PeerImp::processReadBuffer ()
default:
event->reName ("PeerImp::unknown");
event->reName ("Peer::unknown");
WriteLog (lsWARNING, Peer) << "Unknown Msg: " << type;
WriteLog (lsWARNING, Peer) << strHex (&mReadbuf[0], mReadbuf.size ());
}
@@ -1033,7 +1048,6 @@ void PeerImp::recvHello (protocol::TMHello& packet)
uint32 maxTime = ourTime + 20;
#ifdef BEAST_DEBUG
if (packet.has_nettime ())
{
int64 to = ourTime;
@@ -1137,6 +1151,23 @@ void PeerImp::recvHello (protocol::TMHello& packet)
// Consider us connected. No longer accepting mtHELLO.
mHelloed = true;
{
if (! m_remoteAddressSet)
{
boost::asio::ip::address addr (
getNativeSocket().remote_endpoint().address());
if (addr.is_v4())
{
boost::asio::ip::address_v4::bytes_type bytes (addr.to_v4().to_bytes());
m_remoteAddress = IPEndpoint (IPEndpoint::V4 (
bytes[0], bytes[1], bytes[2], bytes[3]), 0);
}
}
getApp().getPeers().getPeerFinder().onPeerConnected (
RipplePublicKey (mNodePublic), m_remoteAddress, m_isInbound);
}
// XXX Set timer: connection is in grace period to be useful.
// XXX Set timer: connection idle (idle may vary depending on connection type.)
@@ -1617,6 +1648,12 @@ void PeerImp::recvPeers (protocol::TMPeers& packet)
addr.s_addr = packet.nodes (i).ipv4 ();
{
IPEndpoint::V4 v4 (ntohl (addr.s_addr));
IPEndpoint ep (v4, packet.nodes (i).ipv4port ());
getApp().getPeers().getPeerFinder().onPeerLegacyEndpoint (ep);
}
std::string strIP (inet_ntoa (addr));
int iPort = packet.nodes (i).ipv4port ();
@@ -1629,6 +1666,58 @@ void PeerImp::recvPeers (protocol::TMPeers& packet)
}
}
void PeerImp::recvEndpoints (protocol::TMEndpoints& packet)
{
std::vector <PeerFinder::Endpoint> endpoints;
endpoints.reserve (packet.endpoints().size());
for (int i = 0; i < packet.endpoints ().size (); ++i)
{
PeerFinder::Endpoint endpoint;
protocol::TMEndpoint const& tm (packet.endpoints(i));
// hops
endpoint.hops = tm.hops();
// ipv4
if (endpoint.hops > 0)
{
in_addr addr;
addr.s_addr = tm.ipv4().ipv4();
IPEndpoint::V4 v4 (ntohl (addr.s_addr));
endpoint.address = IPEndpoint (v4, 0);
endpoint.port = tm.ipv4().ipv4port ();
}
else
{
// This Endpoint describes the peer we are connected to.
// We will take the remote address seen on the socket and
// store that in the Endpoint. If this is the first time,
// then we'll verify that their listener can receive incoming
// by performing a connectivity test.
//
bassert (m_remoteAddressSet);
endpoint.address = m_remoteAddress.withPort (0);
endpoint.port = tm.ipv4().ipv4port ();
}
// slots
endpoint.incomingSlotsAvailable = tm.slots();
// maxSlots
endpoint.incomingSlotsMax = tm.maxslots();
// uptimeMinutes
endpoint.uptimeMinutes = tm.uptimeminutes();
endpoints.push_back (endpoint);
}
getApp().getPeers().getPeerFinder().onPeerEndpoints (
PeerFinder::PeerID (mNodePublic), endpoints);
}
void PeerImp::recvGetObjectByHash (const boost::shared_ptr<protocol::TMGetObjectByHash>& ptr)
{
protocol::TMGetObjectByHash& packet = *ptr;