diff --git a/src/ConnectionPool.cpp b/src/ConnectionPool.cpp index 12373a0da6..ed7df37439 100644 --- a/src/ConnectionPool.cpp +++ b/src/ConnectionPool.cpp @@ -28,7 +28,6 @@ void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort) ConnectionPool::ConnectionPool(boost::asio::io_service& io_service) : mCtx(boost::asio::ssl::context::sslv23), - bScanning(false), mScanTimer(io_service), mPolicyTimer(io_service) { @@ -53,36 +52,29 @@ void ConnectionPool::start() bool ConnectionPool::getTopNAddrs(int n,std::vector& addrs) { // XXX Filter out other local addresses (like ipv6) - if (!theConfig.PEER_IP.empty() && theConfig.PEER_IP != "127.0.0.1") + Database* db = theApp->getWalletDB()->getDB(); + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + + SQL_FOREACH(db, str(boost::format("SELECT IpPort FROM PeerIps LIMIT %d") % n) ) { - addrs.push_back(str(boost::format("%s %d") % theConfig.PEER_IP % theConfig.PEER_PORT)); - } + std::string str; - { - Database* db = theApp->getWalletDB()->getDB(); - ScopedLock sl(theApp->getWalletDB()->getDBLock()); + db->getStr(0,str); - SQL_FOREACH(db, str(boost::format("SELECT IpPort FROM PeerIps LIMIT %d") % n) ) - { - std::string str; - - db->getStr(0,str); - - addrs.push_back(str); - } + addrs.push_back(str); } return true; } -bool ConnectionPool::savePeer(const std::string& strIp, int iPort,char code) +bool ConnectionPool::savePeer(const std::string& strIp, int iPort, char code) { Database* db = theApp->getWalletDB()->getDB(); - std::string ipPort= sqlEscape(str(boost::format("%s %d") % strIp % iPort)); + std::string ipPort = sqlEscape(str(boost::format("%s %d") % strIp % iPort)); ScopedLock sl(theApp->getWalletDB()->getDBLock()); - std::string sql=str(boost::format("SELECT COUNT(*) FROM PeerIps WHERE IpPort=%s;") % ipPort); + std::string sql = str(boost::format("SELECT COUNT(*) FROM PeerIps WHERE IpPort=%s;") % ipPort); if (db->executeSQL(sql) && db->startIterRows()) { if ( db->getInt(0)==0) @@ -99,6 +91,9 @@ bool ConnectionPool::savePeer(const std::string& strIp, int iPort,char code) return false; } +// An available peer is one we had no trouble connect to last time and that we are not currently knowingly connected or connecting +// too. +// // <-- true, if a peer is available to connect to bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort) { @@ -108,6 +103,7 @@ bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort) // Convert mIpMap (list of open connections) to a vector of " ". { boost::mutex::scoped_lock sl(mPeerLock); + vstrIpPort.reserve(mIpMap.size()); BOOST_FOREACH(pipPeer ipPeer, mIpMap) @@ -115,19 +111,22 @@ bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort) const std::string& strIp = ipPeer.first.first; int iPort = ipPeer.first.second; - vstrIpPort.push_back(db->escape(str(boost::format("%s %d") % strIp % iPort))); + vstrIpPort.push_back(sqlEscape(str(boost::format("%s %d") % strIp % iPort))); } } // Get the first IpPort entry which is not in vector and which is not scheduled for scanning. std::string strIpPort; - ScopedLock sl(theApp->getWalletDB()->getDBLock()); - if (db->executeSQL(str(boost::format("SELECT IpPort FROM PeerIps WHERE ScanNext IS NULL AND IpPort NOT IN (%s) LIMIT 1;") - % strJoin(vstrIpPort.begin(), vstrIpPort.end(), ","))) - && db->startIterRows()) { - db->getStr("IpPort", strIpPort); + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + + if (db->executeSQL(str(boost::format("SELECT IpPort FROM PeerIps WHERE ScanNext IS NULL AND IpPort NOT IN (%s) LIMIT 1;") + % strJoin(vstrIpPort.begin(), vstrIpPort.end(), ","))) + && db->startIterRows()) + { + db->getStr("IpPort", strIpPort); + } } bool bAvailable = !strIpPort.empty(); @@ -138,6 +137,7 @@ bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort) return bAvailable; } +// Make sure we have at least low water connections. void ConnectionPool::policyLowWater() { std::string strIp; @@ -166,7 +166,7 @@ void ConnectionPool::policyLowWater() { // Try to start connection. if (!peerConnect(strIp, iPort)) - throw std::runtime_error("Internal error: standby was already connected."); + Log(lsINFO) << "policyLowWater was already connected."; // Check if we need more. policyLowWater(); @@ -175,24 +175,14 @@ void ConnectionPool::policyLowWater() void ConnectionPool::policyEnforce() { - boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time(); - - Log(lsTRACE) << "policyEnforce: begin: " << tpNow; - - // Cancel any in progrss timer. + // Cancel any in progress timer. (void) mPolicyTimer.cancel(); // Enforce policies. policyLowWater(); // Schedule next enforcement. - boost::posix_time::ptime tpNext; - - tpNext = boost::posix_time::second_clock::universal_time()+boost::posix_time::seconds(POLICY_INTERVAL_SECONDS); - - Log(lsTRACE) << "policyEnforce: schedule : " << tpNext; - - mPolicyTimer.expires_at(tpNext); + mPolicyTimer.expires_at(boost::posix_time::second_clock::universal_time()+boost::posix_time::seconds(POLICY_INTERVAL_SECONDS)); mPolicyTimer.async_wait(boost::bind(&ConnectionPool::policyHandler, this, _1)); } @@ -212,6 +202,8 @@ void ConnectionPool::policyHandler(const boost::system::error_code& ecResult) } } +// YYY: Should probably do this in the background. +// YYY: Might end up sending to disconnected peer? void ConnectionPool::relayMessage(Peer* fromPeer, PackedMessage::pointer msg) { boost::mutex::scoped_lock sl(mPeerLock); @@ -226,18 +218,18 @@ void ConnectionPool::relayMessage(Peer* fromPeer, PackedMessage::pointer msg) } } +// Schedule a connection via scanning. +// // Add or modify into PeerIps as a manual entry for immediate scanning. // Requires sane IP and port. void ConnectionPool::connectTo(const std::string& strIp, int iPort) { - Database* db = theApp->getWalletDB()->getDB(); - std::string ipPort = sqlEscape(str(boost::format("%s %d") % strIp % iPort)); - { + Database* db = theApp->getWalletDB()->getDB(); ScopedLock sl(theApp->getWalletDB()->getDBLock()); db->executeSQL(str(boost::format("REPLACE INTO PeerIps (IpPort,Score,Source,ScanNext) values (%s,%d,'%c',0);") - % ipPort + % sqlEscape(str(boost::format("%s %d") % strIp % iPort)) % theApp->getUNL().iSourceScore(UniqueNodeList::vsManual) % char(UniqueNodeList::vsManual))); } @@ -245,46 +237,52 @@ void ConnectionPool::connectTo(const std::string& strIp, int iPort) scanRefresh(); } +// Start a connection, if not already known connected or connecting. +// // <-- true, if already connected. -bool ConnectionPool::peerConnect(const std::string& strIp, int iPort) +Peer::pointer ConnectionPool::peerConnect(const std::string& strIp, int iPort) { - bool bConnecting; - ipPort ip = make_pair(strIp, iPort); + ipPort pipPeer = make_pair(strIp, iPort); + Peer::pointer ppResult = Peer::pointer(); boost::unordered_map::iterator it; - boost::mutex::scoped_lock sl(mPeerLock); - - it = mIpMap.find(ip); - - if (it == mIpMap.end()) { - // Did not find it. Not already connecting or connected. - std::cerr << "ConnectionPool::peerConnect: Connecting: " - << strIp << " " << iPort << std::endl; + boost::mutex::scoped_lock sl(mPeerLock); - Peer::pointer peer(Peer::create(theApp->getIOService(), mCtx)); + if ((it = mIpMap.find(pipPeer)) == mIpMap.end()) + { + Peer::pointer ppNew(Peer::create(theApp->getIOService(), mCtx)); - mIpMap[ip] = peer; + // Did not find it. Not already connecting or connected. + ppNew->connect(strIp, iPort); - peer->connect(strIp, iPort); + mIpMap[pipPeer] = ppNew; - // ++miConnectStarting; + ppResult = ppNew; + // ++miConnectStarting; + } + else + { + // Found it. Already connected. - bConnecting = true; + nothing(); + } + } + + if (ppResult) + { + Log(lsINFO) << "Pool: Connecting: " << ADDRESS_SHARED(ppResult) << ": " << strIp << " " << iPort; } else { - // Found it. Already connected. - std::cerr << "ConnectionPool::peerConnect: Already connected: " - << strIp << " " << iPort << std::endl; - - bConnecting = false; + Log(lsINFO) << "Pool: Already connected: " << strIp << " " << iPort; } - return bConnecting; + return ppResult; } +// Returns information on verified peers. Json::Value ConnectionPool::getPeersJson() { Json::Value ret(Json::arrayValue); @@ -316,18 +314,16 @@ std::vector ConnectionPool::getPeerVector() } // Now know peer's node public key. Determine if we want to stay connected. +// <-- bNew: false = redundant bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& naPeer, const std::string& strIP, int iPort) { - bool bSuccess; + bool bNew = false; - std::cerr << "ConnectionPool::peerConnected: " - << naPeer.humanNodePublic() << " " << strIP << " " << iPort << std::endl; assert(!!peer); if (naPeer == theApp->getWallet().getNodePublic()) { - std::cerr << "ConnectionPool::peerConnected: To self." << std::endl; - bSuccess = false; + Log(lsINFO) << "Pool: Connected: self: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort; } else { @@ -337,31 +333,48 @@ bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& naP if (itCm == mConnectedMap.end()) { // New connection. + Log(lsINFO) << "Pool: Connected: new: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort; + mConnectedMap[naPeer] = peer; - bSuccess = true; + bNew = true; + } + // Found in map, already connected. + else if (!strIP.empty()) + { + // Was an outbound connection, we know IP and port. + // Note in previous connection how to reconnect. + if (itCm->second->getIP().empty()) + { + // Old peer did not know it's IP. + Log(lsINFO) << "Pool: Connected: redundant: outbound: " << ADDRESS_SHARED(peer) << " discovered: " << ADDRESS_SHARED(itCm->second) << ": " << strIP << " " << iPort; + + itCm->second->setIpPort(strIP, iPort); + + // Add old connection to identified connection list. + mIpMap[make_pair(strIP, iPort)] = itCm->second; + } + else + { + // Old peer knew its IP. Do nothing. + Log(lsINFO) << "Pool: Connected: redundant: outbound: rediscovered: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort; + + nothing(); + } } else { - // Found in map, already connected. - if (!strIP.empty()) - { - // Was an outbound connection, we know IP and port. - // Note in previous connection how to reconnect. - itCm->second->peerIpPort(strIP, iPort); - } + Log(lsINFO) << "Pool: Connected: redundant: inbound: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort; - bSuccess = false; // Don't need a redundant connection. + nothing(); } } - return bSuccess; + return bNew; } -// We maintain a map of public key to peer for connectted and verified peers. Maintain it. +// We maintain a map of public key to peer for connected and verified peers. Maintain it. void ConnectionPool::peerDisconnected(Peer::pointer peer, const NewcoinAddress& naPeer) { - std::cerr << "ConnectionPool::peerDisconnected: " << peer->getIP() << " " << peer->getPort() << std::endl; - if (naPeer.isValid()) { boost::unordered_map::iterator itCm; @@ -373,22 +386,36 @@ void ConnectionPool::peerDisconnected(Peer::pointer peer, const NewcoinAddress& if (itCm == mConnectedMap.end()) { // Did not find it. Not already connecting or connected. - std::cerr << "Internal Error: peer connection was inconsistent." << std::endl; - // XXX Bad error. + Log(lsWARNING) << "Pool: disconnected: Internal Error: mConnectedMap was inconsistent."; + // XXX Maybe bad error, considering we have racing connections, may not so bad. + } + else if (itCm->second != peer) + { + Log(lsWARNING) << "Pool: disconected: non canonical entry"; + + nothing(); } else { // Found it. Delete it. mConnectedMap.erase(itCm); + + Log(lsINFO) << "Pool: disconnected: " << naPeer.humanNodePublic() << " " << peer->getIP() << " " << peer->getPort(); } } + else + { + Log(lsINFO) << "Pool: disconnected: anonymous: " << peer->getIP() << " " << peer->getPort(); + } } -void ConnectionPool::peerScanSet(const std::string& strIp, int iPort) +// Schedule for immediate scanning, if not already scheduled. +// +// <-- true, scanRefresh needed. +bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort) { - std::cerr << "ConnectionPool::peerScanSet: " << strIp << " " << iPort << std::endl; - std::string strIpPort = str(boost::format("%s %d") % strIp % iPort); + bool bScanDirty = false; ScopedLock sl(theApp->getWalletDB()->getDBLock()); Database* db = theApp->getWalletDB()->getDB(); @@ -404,38 +431,48 @@ void ConnectionPool::peerScanSet(const std::string& strIp, int iPort) boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time(); boost::posix_time::ptime tpNext = tpNow + boost::posix_time::seconds(iInterval); - std::cerr << str(boost::format("peerScanSet: scan schedule: %s %s (next %s, delay=%s)") - % mScanIp % mScanPort % tpNext % iInterval) << std::endl; + Log(lsINFO) << str(boost::format("Scanning: schedule create: %s %s (next %s, delay=%s)") + % mScanIp % mScanPort % tpNext % iInterval); db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IpPort=%s;") % iToSeconds(tpNext) % iInterval % db->escape(strIpPort))); + + bScanDirty = true; } else { - // Scanning connection terminate, already scheduled for retry. - nothing(); + // Scanning connection terminated, already scheduled for retry. + boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time(); + boost::posix_time::ptime tpNext = ptFromSeconds(db->getInt("ScanNext")); + int iInterval = (tpNext-tpNow).seconds(); + + Log(lsINFO) << str(boost::format("Scanning: schedule exists: %s %s (next %s, delay=%s)") + % mScanIp % mScanPort % tpNext % iInterval); } } else { - std::cerr << "peerScanSet: peer wasn't in PeerIps: " << strIp << " " << iPort << std::endl; + Log(lsWARNING) << "Scanning: peer wasn't in PeerIps: " << strIp << " " << iPort; } + + return bScanDirty; } -void ConnectionPool::peerFailed(const std::string& strIp, int iPort) +// --> strIp: not empty +void ConnectionPool::peerClosed(Peer::pointer peer, const std::string& strIp, int iPort) { - std::cerr << "ConnectionPool::peerFailed: " << strIp << " " << iPort << std::endl; - ipPort ipPeer = make_pair(strIp, iPort); + ipPort ipPeer = make_pair(strIp, iPort); + bool bScanRefresh = false; - // If the fail was our scan, we are no longer scanning. - if (bScanning && !mScanIp.compare(strIp) && mScanPort == iPort) + // If the connecttion was our scan, we are no longer scanning. + if (mScanning && mScanning == peer) { - bScanning = false; + Log(lsINFO) << "Scanning: scan fail: " << strIp << " " << iPort; - // Look for more to scan. - scanRefresh(); + mScanning = Peer::pointer(); // No longer scanning. + bScanRefresh = true; // Look for more to scan. } bool bScanSet = false; @@ -449,33 +486,47 @@ void ConnectionPool::peerFailed(const std::string& strIp, int iPort) if (itIp == mIpMap.end()) { // Did not find it. Not already connecting or connected. - std::cerr << "Internal Error: peer wasn't connected: " - << ipPeer.first << " " << ipPeer.second << std::endl; - // XXX Bad error. + Log(lsWARNING) << "Pool: Disconnect: UNEXPECTED: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; + // XXX Internal error. + } + else if (mIpMap[ipPeer] == peer) + { + // We were the identified connection. + Log(lsINFO) << "Pool: Disconnect: identified: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; + + // Delete our entry. + mIpMap.erase(itIp); + + // We want to connect again. + bScanSet = true; } else { - // Found it. Delete it. - mIpMap.erase(itIp); - - bScanSet = true; + // Found it. But, we were redundent. + Log(lsINFO) << "Pool: Disconnect: redundant: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; } } if (bScanSet) { - // Schedule for scanning. - peerScanSet(ipPeer.first, ipPeer.second); + // Since we disconnnected, try to schedule for scanning again. + bScanRefresh = peerScanSet(ipPeer.first, ipPeer.second); } + + if (bScanRefresh) + scanRefresh(); } -void ConnectionPool::peerVerified(const std::string& strIp, int iPort) +void ConnectionPool::peerVerified(Peer::pointer peer) { - if (bScanning && !mScanIp.compare(strIp), mScanPort == iPort) + if (mScanning && mScanning == peer) { + std::string strIp = peer->getIP(); + int iPort = peer->getPort(); + std::string strIpPort = str(boost::format("%s %d") % strIp % iPort); - std::cerr << str(boost::format("peerVerified: %s %s (scan off)") % mScanIp % mScanPort) << std::endl; + Log(lsINFO) << str(boost::format("Scanning: connected: %s %s (scan off)") % strIp % iPort); // Scan completed successfully. { @@ -487,8 +538,9 @@ void ConnectionPool::peerVerified(const std::string& strIp, int iPort) // XXX Check error. } - bScanning = false; - scanRefresh(); + mScanning = Peer::pointer(); + + scanRefresh(); // Continue scanning. } } @@ -511,10 +563,10 @@ void ConnectionPool::scanHandler(const boost::system::error_code& ecResult) // Scan ips as per db entries. void ConnectionPool::scanRefresh() { - if (bScanning) + if (mScanning) { // Currently scanning, will scan again after completion. - std::cerr << "scanRefresh: already scanning" << std::endl; + Log(lsTRACE) << "Scanning: already scanning"; nothing(); } @@ -527,8 +579,8 @@ void ConnectionPool::scanRefresh() int iInterval; { - ScopedLock sl(theApp->getWalletDB()->getDBLock()); - Database *db=theApp->getWalletDB()->getDB(); + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + Database* db = theApp->getWalletDB()->getDB(); if (db->executeSQL("SELECT * FROM PeerIps INDEXED BY PeerScanIndex WHERE ScanNext NOT NULL ORDER BY ScanNext LIMIT 1;") && db->startIterRows()) @@ -551,7 +603,7 @@ void ConnectionPool::scanRefresh() if (tpNow.is_not_a_date_time()) { - std::cerr << "scanRefresh: no scan needed." << std::endl; + Log(lsINFO) << "Scanning: stop."; (void) mScanTimer.cancel(); } @@ -562,14 +614,13 @@ void ConnectionPool::scanRefresh() (void) mScanTimer.cancel(); - bScanning = true; - - iInterval *= 2; + // XXX iInterval *= 2; + iInterval = 0; iInterval = MAX(iInterval, theConfig.PEER_SCAN_INTERVAL_MIN); tpNext = tpNow + boost::posix_time::seconds(iInterval); - std::cerr << str(boost::format("scanRefresh: now scanning: %s %s (next %s, delay=%s)") + Log(lsTRACE) << str(boost::format("Scanning: %s %s (next %s, delay=%s)") % mScanIp % mScanPort % tpNext % iInterval) << std::endl; { @@ -583,7 +634,8 @@ void ConnectionPool::scanRefresh() // XXX Check error. } - if (!peerConnect(mScanIp, mScanPort)) + mScanning = peerConnect(mScanIp, mScanPort); + if (!mScanning) { // Already connected. Try again. scanRefresh(); @@ -591,7 +643,7 @@ void ConnectionPool::scanRefresh() } else { - std::cerr << "scanRefresh: next due: " << tpNow << std::endl; + Log(lsINFO) << "Scanning: next: " << tpNow; mScanTimer.expires_at(tpNext); mScanTimer.async_wait(boost::bind(&ConnectionPool::scanHandler, this, _1)); diff --git a/src/ConnectionPool.h b/src/ConnectionPool.h index 41a7182ea2..c71c59fef6 100644 --- a/src/ConnectionPool.h +++ b/src/ConnectionPool.h @@ -32,24 +32,24 @@ private: boost::asio::ssl::context mCtx; - bool bScanning; + Peer::pointer mScanning; boost::asio::deadline_timer mScanTimer; std::string mScanIp; int mScanPort; - void scanHandler(const boost::system::error_code& ecResult); + void scanHandler(const boost::system::error_code& ecResult); boost::asio::deadline_timer mPolicyTimer; - void policyHandler(const boost::system::error_code& ecResult); + void policyHandler(const boost::system::error_code& ecResult); // Peers we are establishing a connection with as a client. // int miConnectStarting; - bool peerAvailable(std::string& strIp, int& iPort); - void peerScanSet(const std::string& strIp, int iPort); + bool peerAvailable(std::string& strIp, int& iPort); + bool peerScanSet(const std::string& strIp, int iPort); - bool peerConnect(const std::string& strIp, int iPort); + Peer::pointer peerConnect(const std::string& strIp, int iPort); public: ConnectionPool(boost::asio::io_service& io_service); @@ -78,10 +78,10 @@ public: void peerDisconnected(Peer::pointer peer, const NewcoinAddress& naPeer); // As client accepted. - void peerVerified(const std::string& strIp, int iPort); + void peerVerified(Peer::pointer peer); // As client failed connect and be accepted. - void peerFailed(const std::string& strIp, int iPort); + void peerClosed(Peer::pointer peer, const std::string& strIp, int iPort); Json::Value getPeersJson(); std::vector getPeerVector(); diff --git a/src/Peer.cpp b/src/Peer.cpp index f90b2935a4..9839ed2f6a 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -27,14 +27,15 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) mSocketSsl(io_service, ctx), mVerifyTimer(io_service) { + // Log(lsDEBUG) << "CREATING PEER: " << ADDRESS(this); } void Peer::handle_write(const boost::system::error_code& error, size_t bytes_transferred) { + if (error) + Log(lsINFO) << "Peer: Write: Error: " << ADDRESS(this) << ": bytes=" << bytes_transferred << ": " << error.category().name() << ": " << error.message() << ": " << error; #ifdef DEBUG - if(error) - std::cerr << "Peer::handle_write Error: " << error << " bytes: " << bytes_transferred << std::endl; -// else +// if (!error) // std::cerr << "Peer::handle_write bytes: "<< bytes_transferred << std::endl; #endif @@ -57,14 +58,21 @@ void Peer::handle_write(const boost::system::error_code& error, size_t bytes_tra } } +void Peer::setIpPort(const std::string& strIP, int iPort) +{ + mIpPort = make_pair(strIP, iPort); + + Log(lsDEBUG) << "Peer: Set: " + << ADDRESS(this) << "> " + << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); +} + void Peer::detach(const char *rsn) { -#ifdef DEBUG - Log(lsTRACE) << "DETACHING PEER: " << rsn - << ": " - << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") - << " " << getIP() << " " << getPort() << std::endl; -#endif + Log(lsDEBUG) << "Peer: Detach: " + << ADDRESS(this) << "> " + << rsn << ": " + << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); boost::system::error_code ecCancel; @@ -72,6 +80,12 @@ void Peer::detach(const char *rsn) mSendQ.clear(); + // We may close more than once. + boost::system::error_code ecShutdown; + getSocket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ecShutdown); + + getSocket().close(); + if (mNodePublic.isValid()) { theApp->getConnectionPool().peerDisconnected(shared_from_this(), mNodePublic); @@ -82,11 +96,16 @@ void Peer::detach(const char *rsn) if (!mIpPort.first.empty()) { // Connection might be part of scanning. Inform connect failed. - // Might need to scan. Inform connection disconnected. - theApp->getConnectionPool().peerFailed(mIpPort.first, mIpPort.second); + // Might need to scan. Inform connection closed. + theApp->getConnectionPool().peerClosed(shared_from_this(), mIpPort.first, mIpPort.second); - mIpPort.first.empty(); // Be idompotent. + mIpPort.first.clear(); // Be idompotent. } + + Log(lsDEBUG) << "Peer: Detach: " + << ADDRESS(this) << "< " + << rsn << ": " + << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); } void Peer::handleVerifyTimer(const boost::system::error_code& ecResult) @@ -100,14 +119,15 @@ void Peer::handleVerifyTimer(const boost::system::error_code& ecResult) } else if (ecResult) { - std::cerr << "Peer verify timer error: " << std::endl; + Log(lsINFO) << "Peer verify timer error"; // Can't do anything sound. abort(); } else { - std::cerr << "Peer failed to verify in time." << std::endl; + Log(lsINFO) << "Peer: Verify: Peer failed to verify in time."; + detach("hvt"); } } @@ -120,8 +140,8 @@ void Peer::connect(const std::string strIp, int iPort) mClientConnect = true; - std::cerr << "Peer::connect: " << strIp << " " << iPort << std::endl; - mIpPort = make_pair(strIp, iPort); + mIpPort = make_pair(strIp, iPort); + mIpPortConnect = mIpPort; assert(!mIpPort.first.empty()); boost::asio::ip::tcp::resolver::query query(strIp, boost::lexical_cast(iPortAct), @@ -132,7 +152,7 @@ void Peer::connect(const std::string strIp, int iPort) if (err || itrEndpoint == boost::asio::ip::tcp::resolver::iterator()) { - std::cerr << "Peer::connect: Bad IP" << std::endl; + Log(lsWARNING) << "Peer: Connect: Bad IP: " << strIp; detach("c"); return; } @@ -143,7 +163,7 @@ void Peer::connect(const std::string strIp, int iPort) if (err) { - std::cerr << "Peer::connect: Failed to set timer." << std::endl; + Log(lsWARNING) << "Peer: Connect: Failed to set timer."; detach("c2"); return; } @@ -151,10 +171,10 @@ void Peer::connect(const std::string strIp, int iPort) if (!err) { - std::cerr << "Peer::connect: Connecting: " << mIpPort.first << " " << mIpPort.second << std::endl; + Log(lsINFO) << "Peer: Connect: Outbound: " << ADDRESS(this) << ": " << mIpPort.first << " " << mIpPort.second; boost::asio::async_connect( - mSocketSsl.lowest_layer(), + getSocket(), itrEndpoint, boost::bind( &Peer::handleConnect, @@ -172,7 +192,7 @@ void Peer::handleStart(const boost::system::error_code& error) { if (error) { - std::cerr << "Peer::handleStart: failed:" << error << std::endl; + Log(lsINFO) << "Peer: Handshake: Error: " << error.category().name() << ": " << error.message() << ": " << error; detach("hs"); } else @@ -187,7 +207,7 @@ void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip { if (error) { - std::cerr << "Connect peer: failed:" << error << std::endl; + Log(lsINFO) << "Peer: Connect: Error: " << error.category().name() << ": " << error.message() << ": " << error; detach("hc"); } else @@ -205,25 +225,26 @@ void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip // - We don't bother remembering the inbound IP or port. Only useful for debugging. void Peer::connected(const boost::system::error_code& error) { - boost::asio::ip::tcp::endpoint ep = mSocketSsl.lowest_layer().remote_endpoint(); + boost::asio::ip::tcp::endpoint ep = getSocket().remote_endpoint(); int iPort = ep.port(); std::string strIp = ep.address().to_string(); mClientConnect = false; + mIpPortConnect = make_pair(strIp, iPort); if (iPort == SYSTEM_PEER_PORT) //TODO: Why are you doing this? iPort = -1; if (error) { - std::cerr << "Remote peer: accept error: " << strIp << " " << iPort << " : " << error << std::endl; + Log(lsINFO) << "Peer: Inbound: Error: " << ADDRESS(this) << ": " << strIp << " " << iPort << " : " << error.category().name() << ": " << error.message() << ": " << error; detach("ctd"); } else { - // Not redundant ip and port, add to connection list. + // Not redundant ip and port, handshake, and start. - std::cerr << "Remote peer: accepted: " << strIp << " " << iPort << std::endl; + Log(lsINFO) << "Peer: Inbound: Accepted: " << ADDRESS(this) << ": " << strIp << " " << iPort; mSocketSsl.set_verify_mode(boost::asio::ssl::verify_none); @@ -290,8 +311,8 @@ void Peer::handle_read_header(const boost::system::error_code& error) } else { + Log(lsINFO) << "Peer: Header: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error; detach("hrh2"); - std::cerr << "Peer::handle_read_header: Error: " << error << std::endl; } } @@ -304,8 +325,8 @@ void Peer::handle_read_body(const boost::system::error_code& error) } else { + Log(lsINFO) << "Peer: Body: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error; detach("hrb"); - std::cerr << "Peer::handle_read_body: Error: " << error << std::endl; } } @@ -517,27 +538,25 @@ void Peer::processReadBuffer() void Peer::recvHello(newcoin::TMHello& packet) { #ifdef DEBUG - std::cerr << "Recv(Hello) v=" << packet.version() - << ", index=" << packet.ledgerindex() - << std::endl; + Log(lsINFO) << "Recv(Hello) v=" << packet.version() << ", index=" << packet.ledgerindex(); #endif bool bDetach = true; if (!mNodePublic.setNodePublic(packet.nodepublic())) { - std::cerr << "Recv(Hello): Disconnect: Bad node public key." << std::endl; + Log(lsINFO) << "Recv(Hello): Disconnect: Bad node public key."; } else if (!mNodePublic.verifyNodePublic(mCookieHash, packet.nodeproof())) { // Unable to verify they have private key for claimed public key. - std::cerr << "Recv(Hello): Disconnect: Failed to verify session." << std::endl; + Log(lsINFO) << "Recv(Hello): Disconnect: Failed to verify session."; } else if (!theApp->getConnectionPool().peerConnected(shared_from_this(), mNodePublic, getIP(), getPort())) { // Already connected, self, or some other reason. - std::cerr << "Recv(Hello): Disconnect: Extraneous connection." << std::endl; + Log(lsINFO) << "Recv(Hello): Disconnect: Extraneous connection."; } else { // Successful connection. - std::cerr << "Recv(Hello): Connect: " << mNodePublic.humanNodePublic() << std::endl; + Log(lsINFO) << "Recv(Hello): Connect: " << mNodePublic.humanNodePublic(); // Cancel verification timeout. (void) mVerifyTimer.cancel(); @@ -545,16 +564,18 @@ void Peer::recvHello(newcoin::TMHello& packet) if (mClientConnect) { // If we connected due to scan, no longer need to scan. - theApp->getConnectionPool().peerVerified(mIpPort.first, mIpPort.second); + theApp->getConnectionPool().peerVerified(shared_from_this()); // No longer connecting as client. mClientConnect = false; } else { - // At this point we could add the inbound connection to our IP list. However, the inbound IP address might be that of - // a NAT. It would be best to only add it if and only if we can immediately verify it. - nothing(); + // Take a guess at remotes address. + std::string strIP = getSocket().remote_endpoint().address().to_string(); + int iPort = packet.ipv4port(); + + theApp->getConnectionPool().savePeer(strIP, iPort, UniqueNodeList::vsInbound); } // Consider us connected. No longer accepting mtHELLO. @@ -580,8 +601,10 @@ void Peer::recvHello(newcoin::TMHello& packet) mNodePublic.clear(); detach("recvh"); } - - sendGetPeers(); + else + { + sendGetPeers(); + } } void Peer::recvTransaction(newcoin::TMTransaction& packet) @@ -714,27 +737,27 @@ void Peer::recvGetPeers(newcoin::TMGetPeers& packet) { std::vector addrs; - theApp->getConnectionPool().getTopNAddrs(30,addrs); + theApp->getConnectionPool().getTopNAddrs(30, addrs); - if (addrs.size()) + if (!addrs.empty()) { newcoin::TMPeers peers; - for(int n=0; nset_ipv4(inet_addr(strIP.c_str())); - addr->set_ipv4port(port); + addr->set_ipv4port(iPort); - std::cout << "Teaching about: " << strIP << std::endl; + Log(lsINFO) << "Peer: Teaching: " << ADDRESS(this) << ": " << n << ": " << strIP << " " << iPort; } - PackedMessage::pointer message = boost::make_shared(peers, newcoin::mtPEERS); sendPacket(message); } @@ -746,20 +769,17 @@ void Peer::recvPeers(newcoin::TMPeers& packet) for (int i = 0; i < packet.nodes().size(); ++i) { in_addr addr; - addr.s_addr=packet.nodes(i).ipv4(); - std::string strIP(inet_ntoa(addr)); - int port=packet.nodes(i).ipv4port(); - if (strIP == "0.0.0.0") + addr.s_addr = packet.nodes(i).ipv4(); + + std::string strIP(inet_ntoa(addr)); + int iPort = packet.nodes(i).ipv4port(); + + if (strIP != "0.0.0.0" && strIP != "127.0.0.1") { - strIP = mSocketSsl.lowest_layer().remote_endpoint().address().to_string(); - } + Log(lsINFO) << "Peer: Learning: " << ADDRESS(this) << ": " << i << ": " << strIP << " " << iPort; - // if (strIP != "127.0.0.1") - { - std::cout << "Learning about: " << strIP << std::endl; - - theApp->getConnectionPool().savePeer(strIP, port, 'T'); + theApp->getConnectionPool().savePeer(strIP, iPort, UniqueNodeList::vsTold); } } } @@ -1059,12 +1079,14 @@ void Peer::sendGetPeers() { // get other peers this guy knows about newcoin::TMGetPeers getPeers; + getPeers.set_doweneedthis(1); + PackedMessage::pointer packet = boost::make_shared(getPeers, newcoin::mtGET_PEERS); + sendPacket(packet); } - void Peer::punishPeer(PeerPunish) { } @@ -1072,9 +1094,16 @@ void Peer::punishPeer(PeerPunish) Json::Value Peer::getJson() { Json::Value ret(Json::objectValue); - ret["ip"] = mIpPort.first; - ret["port"] = mIpPort.second; + ret["this"] = ADDRESS(this); ret["public_key"] = mNodePublic.ToString(); + ret["ip"] = mIpPortConnect.first; + ret["port"] = mIpPortConnect.second; + + if (!mIpPort.first.empty()) + { + ret["verified_ip"] = mIpPort.first; + ret["verified_port"] = mIpPort.second; + } return ret; } diff --git a/src/Peer.h b/src/Peer.h index 99c0204af2..e707ec28ce 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -34,6 +34,7 @@ private: bool mConnected; // True, if hello accepted. NewcoinAddress mNodePublic; // Node public key of peer. ipPort mIpPort; + ipPort mIpPortConnect; uint256 mCookieHash; // network state information @@ -99,7 +100,7 @@ public: std::string& getIP() { return mIpPort.first; } int getPort() { return mIpPort.second; } - void peerIpPort(const std::string& strIP, int iPort) { mIpPort = make_pair(strIP, iPort); } + void setIpPort(const std::string& strIP, int iPort); static pointer create(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) {