diff --git a/src/ConnectionPool.cpp b/src/ConnectionPool.cpp index e322cbe40..88f3adb4d 100644 --- a/src/ConnectionPool.cpp +++ b/src/ConnectionPool.cpp @@ -9,6 +9,16 @@ #include #include #include +#include + +static void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort) +{ + std::vector vIpPort; + boost::split(vIpPort, strIpPort, boost::is_any_of(" ")); + + strIp = vIpPort[0]; + iPort = boost::lexical_cast(vIpPort[1]); +} ConnectionPool::ConnectionPool(boost::asio::io_service& io_service) : mCtx(boost::asio::ssl::context::sslv23), @@ -27,11 +37,86 @@ ConnectionPool::ConnectionPool(boost::asio::io_service& io_service) : void ConnectionPool::start() { // XXX Start running policy. + policyEnforce(); // Start scanning. scanRefresh(); } +bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort) +{ + Database* db = theApp->getWalletDB()->getDB(); + std::vector vstrIpPort; + + vstrIpPort.reserve(mIpMap.size()); + + pipPeer ipPeer; + BOOST_FOREACH(ipPeer, mIpMap) + { + std::string& strIp = ipPeer.first.first; + int iPort = ipPeer.first.second; + + vstrIpPort.push_back(db->escape(str(boost::format("%s %d") % strIp % iPort))); + } + + std::string strIpPort; + + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + if (db->executeSQL(str(boost::format("SELECT IpPort FROM PeerIps WHERE ScanNext IS NULL AND IpPort NOT IN (%s) LIMIT 1;") + % strJoin(vstrIpPort.begin(), vstrIpPort.end(), ","))) + && db->startIterRows()) + { + db->getStr("IpPort", strIpPort); + } + + bool bAvailable = !strIpPort.empty(); + + if (bAvailable) + splitIpPort(strIpPort, strIp, iPort); + + return bAvailable; +} + +void ConnectionPool::policyLowWater() +{ + std::string strIp; + int iPort; + + // Find an entry to connect to. + if (mConnectedMap.size() > theConfig.PEER_CONNECT_LOW_WATER) + { + // Above low water mark, don't need more connections. + nothing(); + } +#if 0 + else if (miConnectStarting == theConfig.PEER_START_MAX) + { + // Too many connections starting to start another. + nothing(); + } +#endif + else if (!peerAvailable(strIp, iPort)) + { + // No more connections available to start. + // XXX Might ask peers for more ips. + nothing(); + } + else + { + // Try to start connection. + if (!connectTo(strIp, iPort)) + throw std::runtime_error("Internal error: standby was already connected."); + + // Check if we need more. + policyLowWater(); + } +} + +void ConnectionPool::policyEnforce() +{ + policyLowWater(); +} + // XXX Broken: also don't send a message to a peer if we got it from the peer. void ConnectionPool::relayMessage(Peer* fromPeer, PackedMessage::pointer msg) { @@ -100,6 +185,8 @@ bool ConnectionPool::connectTo(const std::string& strIp, int iPort) peer->connect(strIp, iPort); + // ++miConnectStarting; + bConnecting = true; } else @@ -205,14 +292,14 @@ void ConnectionPool::peerVerified(const std::string& strIp, int iPort) { if (bScanning && !mScanIp.compare(strIp), mScanPort == iPort) { + std::string strIpPort = str(boost::format("%s %d") % strIp % iPort); // Scan completed successfully. { ScopedLock sl(theApp->getWalletDB()->getDBLock()); Database *db=theApp->getWalletDB()->getDB(); - db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=NULL,ScanInterval=0 WHERE IP=%s AND Port=%d;") - % db->escape(strIp) - % iPort)); + db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=NULL,ScanInterval=0 WHERE IpPort=%s;") + % db->escape(strIpPort))); // XXX Check error. } @@ -252,8 +339,7 @@ void ConnectionPool::scanRefresh() // Discover if there are entries that need scanning. boost::posix_time::ptime tpNext; boost::posix_time::ptime tpNow; - std::string strIp; - int iPort; + std::string strIpPort; int iInterval; { @@ -269,8 +355,7 @@ void ConnectionPool::scanRefresh() tpNext = ptFromSeconds(iNext); tpNow = boost::posix_time::second_clock::universal_time(); - db->getStr("IP", strIp); - iPort = db->getInt("Port"); + db->getStr("IpPort", strIpPort); iInterval = db->getInt("ScanInterval"); } else @@ -289,12 +374,12 @@ void ConnectionPool::scanRefresh() else if (tpNext <= tpNow) { // Scan it. + splitIpPort(strIpPort, mScanIp, mScanPort); + (void) mScanTimer.cancel(); - std::cerr << "scanRefresh: scanning: " << strIp << " " << iPort << std::endl; + std::cerr << "scanRefresh: scanning: " << mScanIp << " " << mScanPort << std::endl; bScanning = true; - mScanIp = strIp; - mScanPort = iPort; iInterval *= 2; iInterval = MAX(iInterval, theConfig.PEER_SCAN_INTERVAL_MIN); @@ -305,11 +390,10 @@ void ConnectionPool::scanRefresh() ScopedLock sl(theApp->getWalletDB()->getDBLock()); Database *db=theApp->getWalletDB()->getDB(); - db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IP=%s AND Port=%d;") + db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IpPort=%s;") % iToSeconds(tpNext) % iInterval - % db->escape(strIp) - % iPort)); + % db->escape(strIpPort))); // XXX Check error. } diff --git a/src/ConnectionPool.h b/src/ConnectionPool.h index d30f1a581..6404b6482 100644 --- a/src/ConnectionPool.h +++ b/src/ConnectionPool.h @@ -17,6 +17,7 @@ private: boost::mutex mPeerLock; typedef std::pair naPeer; + typedef std::pair pipPeer; // Peers we are connecting with and non-thin peers we are connected to. boost::unordered_map mIpMap; @@ -33,6 +34,11 @@ private: void scanHandler(const boost::system::error_code& ecResult); + // Peers we are establishing a connection with as a client. + // int miConnectStarting; + + bool peerAvailable(std::string& strIp, int& iPort); + public: ConnectionPool(boost::asio::io_service& io_service); @@ -73,6 +79,12 @@ public: void scanRefresh(); + // + // Connection policy + // + void policyLowWater(); + void policyEnforce(); + #if 0 //std::vector > mBroadcastMessages; diff --git a/src/DBInit.cpp b/src/DBInit.cpp index d07c57417..cf746b59f 100644 --- a/src/DBInit.cpp +++ b/src/DBInit.cpp @@ -198,13 +198,11 @@ const char *WalletDBInit[] = { // ScanInterval: // Delay between scans. "CREATE TABLE PeerIps ( \ - IP TEXT NOT NULL, \ - Port INTEGER NOT NULL DEFAULT -1, \ + IpPort TEXT NOT NULL PRIMARY KEY, \ Score INTEGER NOT NULL, \ Source CHARACTER(1) NOT NULL, \ ScanNext DATETIME DEFAULT 0, \ - ScanInterval INTEGER NOT NULL DEFAULT 0, \ - PRIMARY KEY (IP,Port) \ + ScanInterval INTEGER NOT NULL DEFAULT 0 \ );", "CREATE INDEX PeerScanIndex ON \ diff --git a/src/Peer.cpp b/src/Peer.cpp index 2207043b3..b0c88f637 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -516,6 +516,12 @@ void Peer::recvHello(newcoin::TMHello& packet) // No longer connecting as client. mClientConnect = false; } + else + { + // XXX At this point we could add the inbound connection to our IP list. However, the inbound IP address might be that of + // a NAT. It would be best to only add it if and only if we can immediatly verify it. + nothing(); + } // XXX Set timer: connection is in grace period to be useful. // XXX Set timer: connection idle (idle may vary depending on connection type.) diff --git a/src/UniqueNodeList.cpp b/src/UniqueNodeList.cpp index 1d49924b7..45fab4b5d 100644 --- a/src/UniqueNodeList.cpp +++ b/src/UniqueNodeList.cpp @@ -409,18 +409,16 @@ void UniqueNodeList::scoreCompute() BOOST_FOREACH(ipScore, umScore) { ipPort ipEndpoint = ipScore.first; - std::string strIP = ipEndpoint.first; - int iPort = ipEndpoint.second; + std::string strIpPort = str(boost::format("%s %d") % ipEndpoint.first % ipEndpoint.second); score iPoints = ipScore.second; - vstrValues.push_back(str(boost::format("(%s,%d,%d,'V')") - % db->escape(strIP) - % iPort + vstrValues.push_back(str(boost::format("(%s,%d,'V')") + % db->escape(strIpPort) % iPoints)); } // Set scores for each IP. - db->executeSQL(str(boost::format("REPLACE INTO PeerIps (IP,Port,Score,Source) VALUES %s;") + db->executeSQL(str(boost::format("REPLACE INTO PeerIps (IpPort,Score,Source) VALUES %s;") % strJoin(vstrValues.begin(), vstrValues.end(), ","))); }