diff --git a/src/ConnectionPool.cpp b/src/ConnectionPool.cpp index ed7df37439..a75220930f 100644 --- a/src/ConnectionPool.cpp +++ b/src/ConnectionPool.cpp @@ -69,6 +69,8 @@ bool ConnectionPool::getTopNAddrs(int n,std::vector& addrs) bool ConnectionPool::savePeer(const std::string& strIp, int iPort, char code) { + bool bNew = false; + Database* db = theApp->getWalletDB()->getDB(); std::string ipPort = sqlEscape(str(boost::format("%s %d") % strIp % iPort)); @@ -77,10 +79,10 @@ bool ConnectionPool::savePeer(const std::string& strIp, int iPort, char code) std::string sql = str(boost::format("SELECT COUNT(*) FROM PeerIps WHERE IpPort=%s;") % ipPort); if (db->executeSQL(sql) && db->startIterRows()) { - if ( db->getInt(0)==0) + if (!db->getInt(0)) { db->executeSQL(str(boost::format("INSERT INTO PeerIps (IpPort,Score,Source) values (%s,0,'%c');") % ipPort % code)); - return true; + bNew = true; }// else we already had this peer } else @@ -88,7 +90,10 @@ bool ConnectionPool::savePeer(const std::string& strIp, int iPort, char code) std::cout << "Error saving Peer" << std::endl; } - return false; + if (bNew) + scanRefresh(); + + return bNew; } // An available peer is one we had no trouble connect to last time and that we are not currently knowingly connected or connecting @@ -125,7 +130,7 @@ bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort) % strJoin(vstrIpPort.begin(), vstrIpPort.end(), ","))) && db->startIterRows()) { - db->getStr("IpPort", strIpPort); + strIpPort = db->getStrBinary("IpPort"); } } @@ -147,6 +152,8 @@ void ConnectionPool::policyLowWater() if (mConnectedMap.size() > theConfig.PEER_CONNECT_LOW_WATER) { // Above low water mark, don't need more connections. + Log(lsTRACE) << "Pool: Low water: sufficient connections: " << mConnectedMap.size() << "/" << theConfig.PEER_CONNECT_LOW_WATER; + nothing(); } #if 0 @@ -159,14 +166,18 @@ void ConnectionPool::policyLowWater() else if (!peerAvailable(strIp, iPort)) { // No more connections available to start. + Log(lsTRACE) << "Pool: Low water: no peers available."; + // XXX Might ask peers for more ips. nothing(); } else { // Try to start connection. + Log(lsTRACE) << "Pool: Low water: start connection."; + if (!peerConnect(strIp, iPort)) - Log(lsINFO) << "policyLowWater was already connected."; + Log(lsINFO) << "Pool: Low water: already connected."; // Check if we need more. policyLowWater(); @@ -431,7 +442,7 @@ bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort) boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time(); boost::posix_time::ptime tpNext = tpNow + boost::posix_time::seconds(iInterval); - Log(lsINFO) << str(boost::format("Scanning: schedule create: %s %s (next %s, delay=%s)") + Log(lsINFO) << str(boost::format("Pool: Scan: schedule create: %s %s (next %s, delay=%s)") % mScanIp % mScanPort % tpNext % iInterval); db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IpPort=%s;") @@ -443,18 +454,18 @@ bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort) } else { - // Scanning connection terminated, already scheduled for retry. + // Scan connection terminated, already scheduled for retry. boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time(); boost::posix_time::ptime tpNext = ptFromSeconds(db->getInt("ScanNext")); int iInterval = (tpNext-tpNow).seconds(); - Log(lsINFO) << str(boost::format("Scanning: schedule exists: %s %s (next %s, delay=%s)") + Log(lsINFO) << str(boost::format("Pool: Scan: schedule exists: %s %s (next %s, delay=%s)") % mScanIp % mScanPort % tpNext % iInterval); } } else { - Log(lsWARNING) << "Scanning: peer wasn't in PeerIps: " << strIp << " " << iPort; + Log(lsWARNING) << "Pool: Scan: peer wasn't in PeerIps: " << strIp << " " << iPort; } return bScanDirty; @@ -466,17 +477,17 @@ void ConnectionPool::peerClosed(Peer::pointer peer, const std::string& strIp, in ipPort ipPeer = make_pair(strIp, iPort); bool bScanRefresh = false; - // If the connecttion was our scan, we are no longer scanning. + // If the connection was our scan, we are no longer scanning. if (mScanning && mScanning == peer) { - Log(lsINFO) << "Scanning: scan fail: " << strIp << " " << iPort; + Log(lsINFO) << "Pool: Scan: scan fail: " << strIp << " " << iPort; mScanning = Peer::pointer(); // No longer scanning. bScanRefresh = true; // Look for more to scan. } - bool bScanSet = false; - + // Determine if closed peer was redundant. + bool bRedundant = true; { boost::mutex::scoped_lock sl(mPeerLock); boost::unordered_map::iterator itIp; @@ -486,31 +497,30 @@ void ConnectionPool::peerClosed(Peer::pointer peer, const std::string& strIp, in if (itIp == mIpMap.end()) { // Did not find it. Not already connecting or connected. - Log(lsWARNING) << "Pool: Disconnect: UNEXPECTED: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; + Log(lsWARNING) << "Pool: Closed: UNEXPECTED: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; // XXX Internal error. } else if (mIpMap[ipPeer] == peer) { // We were the identified connection. - Log(lsINFO) << "Pool: Disconnect: identified: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; + Log(lsINFO) << "Pool: Closed: identified: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; // Delete our entry. mIpMap.erase(itIp); - // We want to connect again. - bScanSet = true; + bRedundant = false; } else { // Found it. But, we were redundent. - Log(lsINFO) << "Pool: Disconnect: redundant: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; + Log(lsINFO) << "Pool: Closed: redundant: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; } } - if (bScanSet) + if (!bRedundant) { - // Since we disconnnected, try to schedule for scanning again. - bScanRefresh = peerScanSet(ipPeer.first, ipPeer.second); + // If closed was not redundant schedule if not already scheduled. + bScanRefresh = peerScanSet(ipPeer.first, ipPeer.second) || bScanRefresh; } if (bScanRefresh) @@ -526,7 +536,7 @@ void ConnectionPool::peerVerified(Peer::pointer peer) std::string strIpPort = str(boost::format("%s %d") % strIp % iPort); - Log(lsINFO) << str(boost::format("Scanning: connected: %s %s (scan off)") % strIp % iPort); + Log(lsINFO) << str(boost::format("Pool: Scan: connected: %s %s %s (scan off)") % ADDRESS_SHARED(peer) % strIp % iPort); // Scan completed successfully. { @@ -566,7 +576,7 @@ void ConnectionPool::scanRefresh() if (mScanning) { // Currently scanning, will scan again after completion. - Log(lsTRACE) << "Scanning: already scanning"; + Log(lsTRACE) << "Pool: Scan: already scanning"; nothing(); } @@ -603,7 +613,7 @@ void ConnectionPool::scanRefresh() if (tpNow.is_not_a_date_time()) { - Log(lsINFO) << "Scanning: stop."; + Log(lsINFO) << "Pool: Scan: stop."; (void) mScanTimer.cancel(); } @@ -614,14 +624,14 @@ void ConnectionPool::scanRefresh() (void) mScanTimer.cancel(); - // XXX iInterval *= 2; - iInterval = 0; iInterval = MAX(iInterval, theConfig.PEER_SCAN_INTERVAL_MIN); tpNext = tpNow + boost::posix_time::seconds(iInterval); - Log(lsTRACE) << str(boost::format("Scanning: %s %s (next %s, delay=%s)") - % mScanIp % mScanPort % tpNext % iInterval) << std::endl; + iInterval *= 2; + + Log(lsINFO) << str(boost::format("Pool: Scan: Now: %s %s (next %s, delay=%s)") + % mScanIp % mScanPort % tpNext % iInterval); { ScopedLock sl(theApp->getWalletDB()->getDBLock()); @@ -643,7 +653,8 @@ void ConnectionPool::scanRefresh() } else { - Log(lsINFO) << "Scanning: next: " << tpNow; + Log(lsINFO) << str(boost::format("Pool: Scan: Next: %s (next %s, delay=%s)") + % strIpPort % tpNext % (tpNext-tpNow).seconds()); mScanTimer.expires_at(tpNext); mScanTimer.async_wait(boost::bind(&ConnectionPool::scanHandler, this, _1)); diff --git a/src/Peer.cpp b/src/Peer.cpp index fd18cf6090..da00a437c2 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -24,6 +24,7 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) : mConnected(false), + mDetaching(false), mSocketSsl(io_service, ctx), mVerifyTimer(io_service) { @@ -32,8 +33,6 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) void Peer::handle_write(const boost::system::error_code& error, size_t bytes_transferred) { - if (error) - Log(lsINFO) << "Peer: Write: Error: " << ADDRESS(this) << ": bytes=" << bytes_transferred << ": " << error.category().name() << ": " << error.message() << ": " << error; #ifdef DEBUG // if (!error) // std::cerr << "Peer::handle_write bytes: "<< bytes_transferred << std::endl; @@ -43,11 +42,14 @@ void Peer::handle_write(const boost::system::error_code& error, size_t bytes_tra if (error) { - detach("hw"); - return; - } + if (!mDetaching) + { + Log(lsINFO) << "Peer: Write: Error: " << ADDRESS(this) << ": bytes=" << bytes_transferred << ": " << error.category().name() << ": " << error.message() << ": " << error; - if (!mSendQ.empty()) + detach("hw"); + } + + } else if (!mSendQ.empty()) { PackedMessage::pointer packet = mSendQ.front(); if(packet) @@ -69,43 +71,49 @@ void Peer::setIpPort(const std::string& strIP, int iPort) void Peer::detach(const char *rsn) { - Log(lsDEBUG) << "Peer: Detach: " - << ADDRESS(this) << "> " - << rsn << ": " - << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); - boost::system::error_code ecCancel; - - (void) mVerifyTimer.cancel(); - - mSendQ.clear(); - - // We may close more than once. - boost::system::error_code ecShutdown; - getSocket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ecShutdown); - - getSocket().close(); - - if (mNodePublic.isValid()) + if (!mDetaching) { - theApp->getConnectionPool().peerDisconnected(shared_from_this(), mNodePublic); + mDetaching = true; // Race is ok. - mNodePublic.clear(); // Be idompotent. + Log(lsDEBUG) << "Peer: Detach: " + << ADDRESS(this) << "> " + << rsn << ": " + << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); + + boost::system::error_code ecCancel; + + (void) mVerifyTimer.cancel(); + + mSendQ.clear(); + + // We may close more than once. + boost::system::error_code ecShutdown; + getSocket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ecShutdown); + + getSocket().close(); + + if (mNodePublic.isValid()) + { + theApp->getConnectionPool().peerDisconnected(shared_from_this(), mNodePublic); + + mNodePublic.clear(); // Be idompotent. + } + + if (!mIpPort.first.empty()) + { + // Connection might be part of scanning. Inform connect failed. + // Might need to scan. Inform connection closed. + theApp->getConnectionPool().peerClosed(shared_from_this(), mIpPort.first, mIpPort.second); + + mIpPort.first.clear(); // Be idompotent. + } + + Log(lsDEBUG) << "Peer: Detach: " + << ADDRESS(this) << "< " + << rsn << ": " + << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); } - - if (!mIpPort.first.empty()) - { - // Connection might be part of scanning. Inform connect failed. - // Might need to scan. Inform connection closed. - theApp->getConnectionPool().peerClosed(shared_from_this(), mIpPort.first, mIpPort.second); - - mIpPort.first.clear(); // Be idompotent. - } - - Log(lsDEBUG) << "Peer: Detach: " - << ADDRESS(this) << "< " - << rsn << ": " - << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); } void Peer::handleVerifyTimer(const boost::system::error_code& ecResult) @@ -235,12 +243,7 @@ void Peer::connected(const boost::system::error_code& error) if (iPort == SYSTEM_PEER_PORT) //TODO: Why are you doing this? iPort = -1; - if (error) - { - Log(lsINFO) << "Peer: Inbound: Error: " << ADDRESS(this) << ": " << strIp << " " << iPort << " : " << error.category().name() << ": " << error.message() << ": " << error; - detach("ctd"); - } - else + if (!error) { // Not redundant ip and port, handshake, and start. @@ -251,6 +254,12 @@ void Peer::connected(const boost::system::error_code& error) mSocketSsl.async_handshake(boost::asio::ssl::stream::server, boost::bind(&Peer::handleStart, shared_from_this(), boost::asio::placeholders::error)); } + else if (!mDetaching) + { + Log(lsINFO) << "Peer: Inbound: Error: " << ADDRESS(this) << ": " << strIp << " " << iPort << " : " << error.category().name() << ": " << error.message() << ": " << error; + + detach("ctd"); + } } void Peer::sendPacketForce(PackedMessage::pointer packet) @@ -309,7 +318,7 @@ void Peer::handle_read_header(const boost::system::error_code& error) } start_read_body(msg_len); } - else + else if (!mDetaching) { Log(lsINFO) << "Peer: Header: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error; detach("hrh2"); @@ -323,7 +332,7 @@ void Peer::handle_read_body(const boost::system::error_code& error) processReadBuffer(); start_read_header(); } - else + else if (!mDetaching) { Log(lsINFO) << "Peer: Body: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error; detach("hrb"); @@ -342,7 +351,7 @@ void Peer::processReadBuffer() // If connected and get a mtHELLO or if not connected and get a non-mtHELLO, wrong message was sent. if (mConnected == (type == newcoin::mtHELLO)) { - std::cerr << "Wrong message type: " << type << std::endl; + Log(lsWARNING) << "Wrong message type: " << type; detach("prb1"); } else @@ -542,6 +551,9 @@ void Peer::recvHello(newcoin::TMHello& packet) #endif bool bDetach = true; + // Cancel verification timeout. + (void) mVerifyTimer.cancel(); + if (!mNodePublic.setNodePublic(packet.nodepublic())) { Log(lsINFO) << "Recv(Hello): Disconnect: Bad node public key."; @@ -550,50 +562,53 @@ void Peer::recvHello(newcoin::TMHello& packet) { // Unable to verify they have private key for claimed public key. Log(lsINFO) << "Recv(Hello): Disconnect: Failed to verify session."; } - else if (!theApp->getConnectionPool().peerConnected(shared_from_this(), mNodePublic, getIP(), getPort())) - { // Already connected, self, or some other reason. - Log(lsINFO) << "Recv(Hello): Disconnect: Extraneous connection."; - } else { // Successful connection. Log(lsINFO) << "Recv(Hello): Connect: " << mNodePublic.humanNodePublic(); - // Cancel verification timeout. - (void) mVerifyTimer.cancel(); - if (mClientConnect) { // If we connected due to scan, no longer need to scan. theApp->getConnectionPool().peerVerified(shared_from_this()); + } - // No longer connecting as client. - mClientConnect = false; + if (!theApp->getConnectionPool().peerConnected(shared_from_this(), mNodePublic, getIP(), getPort())) + { // Already connected, self, or some other reason. + Log(lsINFO) << "Recv(Hello): Disconnect: Extraneous connection."; } else { - // Take a guess at remotes address. - std::string strIP = getSocket().remote_endpoint().address().to_string(); - int iPort = packet.ipv4port(); + if (mClientConnect) + { + // No longer connecting as client. + mClientConnect = false; + } + else + { + // Take a guess at remotes address. + std::string strIP = getSocket().remote_endpoint().address().to_string(); + int iPort = packet.ipv4port(); - theApp->getConnectionPool().savePeer(strIP, iPort, UniqueNodeList::vsInbound); + theApp->getConnectionPool().savePeer(strIP, iPort, UniqueNodeList::vsInbound); + } + + // Consider us connected. No longer accepting mtHELLO. + mConnected = true; + + // XXX Set timer: connection is in grace period to be useful. + // XXX Set timer: connection idle (idle may vary depending on connection type.) + + if ((packet.has_closedledger()) && (packet.closedledger().size() == (256 / 8))) + { + memcpy(mClosedLedgerHash.begin(), packet.closedledger().data(), 256 / 8); + if ((packet.has_previousledger()) && (packet.previousledger().size() == (256 / 8))) + memcpy(mPreviousLedgerHash.begin(), packet.previousledger().data(), 256 / 8); + else mPreviousLedgerHash.zero(); + mClosedLedgerTime = boost::posix_time::second_clock::universal_time(); + } + + bDetach = false; } - - // Consider us connected. No longer accepting mtHELLO. - mConnected = true; - - // XXX Set timer: connection is in grace period to be useful. - // XXX Set timer: connection idle (idle may vary depending on connection type.) - - if ((packet.has_closedledger()) && (packet.closedledger().size() == (256 / 8))) - { - memcpy(mClosedLedgerHash.begin(), packet.closedledger().data(), 256 / 8); - if ((packet.has_previousledger()) && (packet.previousledger().size() == (256 / 8))) - memcpy(mPreviousLedgerHash.begin(), packet.previousledger().data(), 256 / 8); - else mPreviousLedgerHash.zero(); - mClosedLedgerTime = boost::posix_time::second_clock::universal_time(); - } - - bDetach = false; } if (bDetach) diff --git a/src/Peer.h b/src/Peer.h index 6d38399ec7..f4ecc82d30 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -32,6 +32,7 @@ public: private: bool mClientConnect; // In process of connecting as client. bool mConnected; // True, if hello accepted. + bool mDetaching; // True, if detaching. NewcoinAddress mNodePublic; // Node public key of peer. ipPort mIpPort; ipPort mIpPortConnect;