#include "ConnectionPool.h" #include #include #include #include #include #include "Config.h" #include "Peer.h" #include "Application.h" #include "utils.h" #include "Log.h" // How often to enforce policies. #define POLICY_INTERVAL_SECONDS 5 void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort) { std::vector vIpPort; boost::split(vIpPort, strIpPort, boost::is_any_of(" ")); strIp = vIpPort[0]; iPort = boost::lexical_cast(vIpPort[1]); } ConnectionPool::ConnectionPool(boost::asio::io_service& io_service) : mCtx(boost::asio::ssl::context::sslv23), bScanning(false), mScanTimer(io_service), mPolicyTimer(io_service) { mCtx.set_options( boost::asio::ssl::context::default_workarounds | boost::asio::ssl::context::no_sslv2 | boost::asio::ssl::context::single_dh_use); if (1 != SSL_CTX_set_cipher_list(mCtx.native_handle(), theConfig.PEER_SSL_CIPHER_LIST.c_str())) std::runtime_error("Error setting cipher list (no valid ciphers)."); } void ConnectionPool::start() { // Start running policy. policyEnforce(); // Start scanning. scanRefresh(); } bool ConnectionPool::getTopNAddrs(int n,std::vector& addrs) { // XXX Filter out other local addresses (like ipv6) if (!theConfig.PEER_IP.empty() && theConfig.PEER_IP != "127.0.0.1") { addrs.push_back(str(boost::format("%s %d") % theConfig.PEER_IP % theConfig.PEER_PORT)); } { Database* db = theApp->getWalletDB()->getDB(); ScopedLock sl(theApp->getWalletDB()->getDBLock()); SQL_FOREACH(db, str(boost::format("SELECT IpPort FROM PeerIps LIMIT %d") % n) ) { std::string str; db->getStr(0,str); addrs.push_back(str); } } return true; } bool ConnectionPool::savePeer(const std::string& strIp, int iPort,char code) { Database* db = theApp->getWalletDB()->getDB(); std::string ipPort= sqlEscape(str(boost::format("%s %d") % strIp % iPort)); ScopedLock sl(theApp->getWalletDB()->getDBLock()); std::string sql=str(boost::format("SELECT COUNT(*) FROM PeerIps WHERE IpPort=%s;") % ipPort); if (db->executeSQL(sql) && db->startIterRows()) { if ( db->getInt(0)==0) { db->executeSQL(str(boost::format("INSERT INTO PeerIps (IpPort,Score,Source) values (%s,0,'%c');") % ipPort % code)); return true; }// else we already had this peer } else { std::cout << "Error saving Peer" << std::endl; } return false; } // <-- true, if a peer is available to connect to bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort) { Database* db = theApp->getWalletDB()->getDB(); std::vector vstrIpPort; // Convert mIpMap (list of open connections) to a vector of " ". { boost::mutex::scoped_lock sl(mPeerLock); vstrIpPort.reserve(mIpMap.size()); BOOST_FOREACH(pipPeer ipPeer, mIpMap) { const std::string& strIp = ipPeer.first.first; int iPort = ipPeer.first.second; vstrIpPort.push_back(db->escape(str(boost::format("%s %d") % strIp % iPort))); } } // Get the first IpPort entry which is not in vector and which is not scheduled for scanning. std::string strIpPort; ScopedLock sl(theApp->getWalletDB()->getDBLock()); if (db->executeSQL(str(boost::format("SELECT IpPort FROM PeerIps WHERE ScanNext IS NULL AND IpPort NOT IN (%s) LIMIT 1;") % strJoin(vstrIpPort.begin(), vstrIpPort.end(), ","))) && db->startIterRows()) { db->getStr("IpPort", strIpPort); } bool bAvailable = !strIpPort.empty(); if (bAvailable) splitIpPort(strIpPort, strIp, iPort); return bAvailable; } void ConnectionPool::policyLowWater() { std::string strIp; int iPort; // Find an entry to connect to. if (mConnectedMap.size() > theConfig.PEER_CONNECT_LOW_WATER) { // Above low water mark, don't need more connections. nothing(); } #if 0 else if (miConnectStarting == theConfig.PEER_START_MAX) { // Too many connections starting to start another. nothing(); } #endif else if (!peerAvailable(strIp, iPort)) { // No more connections available to start. // XXX Might ask peers for more ips. nothing(); } else { // Try to start connection. if (!peerConnect(strIp, iPort)) throw std::runtime_error("Internal error: standby was already connected."); // Check if we need more. policyLowWater(); } } void ConnectionPool::policyEnforce() { boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time(); Log(lsTRACE) << "policyEnforce: begin: " << tpNow; // Cancel any in progrss timer. (void) mPolicyTimer.cancel(); // Enforce policies. policyLowWater(); // Schedule next enforcement. boost::posix_time::ptime tpNext; tpNext = boost::posix_time::second_clock::universal_time()+boost::posix_time::seconds(POLICY_INTERVAL_SECONDS); Log(lsTRACE) << "policyEnforce: schedule : " << tpNext; mPolicyTimer.expires_at(tpNext); mPolicyTimer.async_wait(boost::bind(&ConnectionPool::policyHandler, this, _1)); } void ConnectionPool::policyHandler(const boost::system::error_code& ecResult) { if (ecResult == boost::asio::error::operation_aborted) { nothing(); } else if (!ecResult) { policyEnforce(); } else { throw std::runtime_error("Internal error: unexpected deadline error."); } } void ConnectionPool::relayMessage(Peer* fromPeer, PackedMessage::pointer msg) { boost::mutex::scoped_lock sl(mPeerLock); BOOST_FOREACH(naPeer pair, mConnectedMap) { Peer::pointer peer = pair.second; if (!peer) std::cerr << "CP::RM null peer in list" << std::endl; else if ((!fromPeer || !(peer.get() == fromPeer)) && peer->isConnected()) peer->sendPacket(msg); } } // Add or modify into PeerIps as a manual entry for immediate scanning. // Requires sane IP and port. void ConnectionPool::connectTo(const std::string& strIp, int iPort) { Database* db = theApp->getWalletDB()->getDB(); std::string ipPort = sqlEscape(str(boost::format("%s %d") % strIp % iPort)); { ScopedLock sl(theApp->getWalletDB()->getDBLock()); db->executeSQL(str(boost::format("REPLACE INTO PeerIps (IpPort,Score,Source,ScanNext) values (%s,%d,'%c',0);") % ipPort % theApp->getUNL().iSourceScore(UniqueNodeList::vsManual) % char(UniqueNodeList::vsManual))); } scanRefresh(); } // <-- true, if already connected. bool ConnectionPool::peerConnect(const std::string& strIp, int iPort) { bool bConnecting; ipPort ip = make_pair(strIp, iPort); boost::unordered_map::iterator it; boost::mutex::scoped_lock sl(mPeerLock); it = mIpMap.find(ip); if (it == mIpMap.end()) { // Did not find it. Not already connecting or connected. std::cerr << "ConnectionPool::peerConnect: Connecting: " << strIp << " " << iPort << std::endl; Peer::pointer peer(Peer::create(theApp->getIOService(), mCtx)); mIpMap[ip] = peer; peer->connect(strIp, iPort); // ++miConnectStarting; bConnecting = true; } else { // Found it. Already connected. std::cerr << "ConnectionPool::peerConnect: Already connected: " << strIp << " " << iPort << std::endl; bConnecting = false; } return bConnecting; } Json::Value ConnectionPool::getPeersJson() { Json::Value ret(Json::arrayValue); std::vector vppPeers = getPeerVector(); BOOST_FOREACH(Peer::pointer peer, vppPeers) { ret.append(peer->getJson()); } return ret; } std::vector ConnectionPool::getPeerVector() { std::vector ret; boost::mutex::scoped_lock sl(mPeerLock); ret.reserve(mConnectedMap.size()); BOOST_FOREACH(naPeer pair, mConnectedMap) { assert(!!pair.second); ret.push_back(pair.second); } return ret; } // Now know peer's node public key. Determine if we want to stay connected. bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& naPeer, const std::string& strIP, int iPort) { bool bSuccess; std::cerr << "ConnectionPool::peerConnected: " << naPeer.humanNodePublic() << " " << strIP << " " << iPort << std::endl; assert(!!peer); if (naPeer == theApp->getWallet().getNodePublic()) { std::cerr << "ConnectionPool::peerConnected: To self." << std::endl; bSuccess = false; } else { boost::mutex::scoped_lock sl(mPeerLock); boost::unordered_map::iterator itCm = mConnectedMap.find(naPeer); if (itCm == mConnectedMap.end()) { // New connection. mConnectedMap[naPeer] = peer; bSuccess = true; } else { // Found in map, already connected. if (!strIP.empty()) { // Was an outbound connection, we know IP and port. // Note in previous connection how to reconnect. itCm->second->peerIpPort(strIP, iPort); } bSuccess = false; // Don't need a redundant connection. } } return bSuccess; } // We maintain a map of public key to peer for connectted and verified peers. Maintain it. void ConnectionPool::peerDisconnected(Peer::pointer peer, const NewcoinAddress& naPeer) { std::cerr << "ConnectionPool::peerDisconnected: " << peer->getIP() << " " << peer->getPort() << std::endl; if (naPeer.isValid()) { boost::unordered_map::iterator itCm; boost::mutex::scoped_lock sl(mPeerLock); itCm = mConnectedMap.find(naPeer); if (itCm == mConnectedMap.end()) { // Did not find it. Not already connecting or connected. std::cerr << "Internal Error: peer connection was inconsistent." << std::endl; // XXX Bad error. } else { // Found it. Delete it. mConnectedMap.erase(itCm); } } } void ConnectionPool::peerScanSet(const std::string& strIp, int iPort) { std::cerr << "ConnectionPool::peerScanSet: " << strIp << " " << iPort << std::endl; std::string strIpPort = str(boost::format("%s %d") % strIp % iPort); ScopedLock sl(theApp->getWalletDB()->getDBLock()); Database* db = theApp->getWalletDB()->getDB(); if (db->executeSQL(str(boost::format("SELECT ScanNext FROM PeerIps WHERE IpPort=%s;") % sqlEscape(strIpPort))) && db->startIterRows()) { if (db->getNull("ScanNext")) { // Non-scanning connection terminated. Schedule for scanning. int iInterval = theConfig.PEER_SCAN_INTERVAL_MIN; boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time(); boost::posix_time::ptime tpNext = tpNow + boost::posix_time::seconds(iInterval); std::cerr << str(boost::format("peerScanSet: scan schedule: %s %s (next %s, delay=%s)") % mScanIp % mScanPort % tpNext % iInterval) << std::endl; db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IpPort=%s;") % iToSeconds(tpNext) % iInterval % db->escape(strIpPort))); } else { // Scanning connection terminate, already scheduled for retry. nothing(); } } else { std::cerr << "peerScanSet: peer wasn't in PeerIps: " << strIp << " " << iPort << std::endl; } } void ConnectionPool::peerFailed(const std::string& strIp, int iPort) { std::cerr << "ConnectionPool::peerFailed: " << strIp << " " << iPort << std::endl; ipPort ipPeer = make_pair(strIp, iPort); // If the fail was our scan, we are no longer scanning. if (bScanning && !mScanIp.compare(strIp) && mScanPort == iPort) { bScanning = false; // Look for more to scan. scanRefresh(); } bool bScanSet = false; { boost::mutex::scoped_lock sl(mPeerLock); boost::unordered_map::iterator itIp; itIp = mIpMap.find(ipPeer); if (itIp == mIpMap.end()) { // Did not find it. Not already connecting or connected. std::cerr << "Internal Error: peer wasn't connected: " << ipPeer.first << " " << ipPeer.second << std::endl; // XXX Bad error. } else { // Found it. Delete it. mIpMap.erase(itIp); bScanSet = true; } } if (bScanSet) { // Schedule for scanning. peerScanSet(ipPeer.first, ipPeer.second); } } void ConnectionPool::peerVerified(const std::string& strIp, int iPort) { if (bScanning && !mScanIp.compare(strIp), mScanPort == iPort) { std::string strIpPort = str(boost::format("%s %d") % strIp % iPort); std::cerr << str(boost::format("peerVerified: %s %s (scan off)") % mScanIp % mScanPort) << std::endl; // Scan completed successfully. { ScopedLock sl(theApp->getWalletDB()->getDBLock()); Database *db=theApp->getWalletDB()->getDB(); db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=NULL,ScanInterval=0 WHERE IpPort=%s;") % db->escape(strIpPort))); // XXX Check error. } bScanning = false; scanRefresh(); } } void ConnectionPool::scanHandler(const boost::system::error_code& ecResult) { if (ecResult == boost::asio::error::operation_aborted) { nothing(); } else if (!ecResult) { scanRefresh(); } else { throw std::runtime_error("Internal error: unexpected deadline error."); } } // Scan ips as per db entries. void ConnectionPool::scanRefresh() { if (bScanning) { // Currently scanning, will scan again after completion. std::cerr << "scanRefresh: already scanning" << std::endl; nothing(); } else { // Discover if there are entries that need scanning. boost::posix_time::ptime tpNext; boost::posix_time::ptime tpNow; std::string strIpPort; int iInterval; { ScopedLock sl(theApp->getWalletDB()->getDBLock()); Database *db=theApp->getWalletDB()->getDB(); if (db->executeSQL("SELECT * FROM PeerIps INDEXED BY PeerScanIndex WHERE ScanNext NOT NULL ORDER BY ScanNext LIMIT 1;") && db->startIterRows()) { // Have an entry to scan. int iNext = db->getInt("ScanNext"); tpNext = ptFromSeconds(iNext); tpNow = boost::posix_time::second_clock::universal_time(); db->getStr("IpPort", strIpPort); iInterval = db->getInt("ScanInterval"); } else { // No entries to scan. tpNow = boost::posix_time::ptime(boost::posix_time::not_a_date_time); } } if (tpNow.is_not_a_date_time()) { std::cerr << "scanRefresh: no scan needed." << std::endl; (void) mScanTimer.cancel(); } else if (tpNext <= tpNow) { // Scan it. splitIpPort(strIpPort, mScanIp, mScanPort); (void) mScanTimer.cancel(); bScanning = true; iInterval *= 2; iInterval = MAX(iInterval, theConfig.PEER_SCAN_INTERVAL_MIN); tpNext = tpNow + boost::posix_time::seconds(iInterval); std::cerr << str(boost::format("scanRefresh: now scanning: %s %s (next %s, delay=%s)") % mScanIp % mScanPort % tpNext % iInterval) << std::endl; { ScopedLock sl(theApp->getWalletDB()->getDBLock()); Database *db=theApp->getWalletDB()->getDB(); db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IpPort=%s;") % iToSeconds(tpNext) % iInterval % db->escape(strIpPort))); // XXX Check error. } if (!peerConnect(mScanIp, mScanPort)) { // Already connected. Try again. scanRefresh(); } } else { std::cerr << "scanRefresh: next due: " << tpNow << std::endl; mScanTimer.expires_at(tpNext); mScanTimer.async_wait(boost::bind(&ConnectionPool::scanHandler, this, _1)); } } } #if 0 bool ConnectionPool::isMessageKnown(PackedMessage::pointer msg) { for(unsigned int n=0; n