diff --git a/src/Application.cpp b/src/Application.cpp index 7c097cb26..01daae4b1 100644 --- a/src/Application.cpp +++ b/src/Application.cpp @@ -44,6 +44,7 @@ DatabaseCon::~DatabaseCon() Application::Application() : mUNL(mIOService), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL), + mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL) { nothing(); diff --git a/src/ConnectionPool.cpp b/src/ConnectionPool.cpp index 22c154f3e..e322cbe40 100644 --- a/src/ConnectionPool.cpp +++ b/src/ConnectionPool.cpp @@ -1,18 +1,19 @@ -#include -#include - #include "ConnectionPool.h" #include "Config.h" #include "Peer.h" #include "Application.h" #include "utils.h" -// XXX On Windows make sure OpenSSL PRNG is seeded: EGADS +#include +#include +#include +#include -ConnectionPool::ConnectionPool() : - iConnecting(0), - mCtx(boost::asio::ssl::context::sslv23) +ConnectionPool::ConnectionPool(boost::asio::io_service& io_service) : + mCtx(boost::asio::ssl::context::sslv23), + bScanning(false), + mScanTimer(io_service) { mCtx.set_options( boost::asio::ssl::context::default_workarounds @@ -26,6 +27,9 @@ ConnectionPool::ConnectionPool() : void ConnectionPool::start() { // XXX Start running policy. + + // Start scanning. + scanRefresh(); } // XXX Broken: also don't send a message to a peer if we got it from the peer. @@ -75,6 +79,7 @@ bool ConnectionPool::peerRegister(Peer::pointer peer, const std::string& strIp, bool ConnectionPool::connectTo(const std::string& strIp, int iPort) { + bool bConnecting; ipPort ip = make_pair(strIp, iPort); boost::unordered_map::iterator it; @@ -94,15 +99,19 @@ bool ConnectionPool::connectTo(const std::string& strIp, int iPort) mIpMap[ip] = peer; peer->connect(strIp, iPort); + + bConnecting = true; } else { // Found it. Already connected. std::cerr << "ConnectionPool::connectTo: Already connected: " << strIp << " " << iPort << std::endl; + + bConnecting = false; } - return true; + return bConnecting; } Json::Value ConnectionPool::getPeersJson() @@ -129,7 +138,7 @@ bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& na) if (na == theApp->getWallet().getNodePublic()) { std::cerr << "ConnectionPool::peerConnected: To self." << std::endl; - bSuccess = false; + bSuccess = false; } else { @@ -183,6 +192,143 @@ void ConnectionPool::peerDisconnected(Peer::pointer peer, const ipPort& ipPeer, } } +void ConnectionPool::peerFailed(const std::string& strIp, int iPort) +{ + if (bScanning && !mScanIp.compare(strIp), mScanPort == iPort) + { + bScanning = false; + scanRefresh(); + } +} + +void ConnectionPool::peerVerified(const std::string& strIp, int iPort) +{ + if (bScanning && !mScanIp.compare(strIp), mScanPort == iPort) + { + // Scan completed successfully. + { + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + Database *db=theApp->getWalletDB()->getDB(); + + db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=NULL,ScanInterval=0 WHERE IP=%s AND Port=%d;") + % db->escape(strIp) + % iPort)); + // XXX Check error. + } + + bScanning = false; + scanRefresh(); + } +} + +void ConnectionPool::scanHandler(const boost::system::error_code& ecResult) +{ + if (ecResult == boost::asio::error::operation_aborted) + { + nothing(); + } + else if (!ecResult) + { + scanRefresh(); + } + else + { + throw std::runtime_error("Internal error: unexpected deadline error."); + } +} + +// Scan ips as per db entries. +void ConnectionPool::scanRefresh() +{ + if (bScanning) + { + // Currently scanning, will scan again after completion. + std::cerr << "scanRefresh: already scanning" << std::endl; + + nothing(); + } + else + { + // Discover if there are entries that need scanning. + boost::posix_time::ptime tpNext; + boost::posix_time::ptime tpNow; + std::string strIp; + int iPort; + int iInterval; + + { + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + Database *db=theApp->getWalletDB()->getDB(); + + if (db->executeSQL("SELECT * FROM PeerIps INDEXED BY PeerScanIndex WHERE ScanNext NOT NULL ORDER BY ScanNext LIMIT 1;") + && db->startIterRows()) + { + // Have an entry to scan. + int iNext = db->getInt("ScanNext"); + + tpNext = ptFromSeconds(iNext); + tpNow = boost::posix_time::second_clock::universal_time(); + + db->getStr("IP", strIp); + iPort = db->getInt("Port"); + iInterval = db->getInt("ScanInterval"); + } + else + { + // No entries to scan. + tpNow = boost::posix_time::ptime(boost::posix_time::not_a_date_time); + } + } + + if (tpNow.is_not_a_date_time()) + { + std::cerr << "scanRefresh: no scan needed." << std::endl; + + (void) mScanTimer.cancel(); + } + else if (tpNext <= tpNow) + { + // Scan it. + (void) mScanTimer.cancel(); + + std::cerr << "scanRefresh: scanning: " << strIp << " " << iPort << std::endl; + bScanning = true; + mScanIp = strIp; + mScanPort = iPort; + + iInterval *= 2; + iInterval = MAX(iInterval, theConfig.PEER_SCAN_INTERVAL_MIN); + + tpNext = tpNow + boost::posix_time::seconds(iInterval); + + { + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + Database *db=theApp->getWalletDB()->getDB(); + + db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IP=%s AND Port=%d;") + % iToSeconds(tpNext) + % iInterval + % db->escape(strIp) + % iPort)); + // XXX Check error. + } + + if (!connectTo(mScanIp, mScanPort)) + { + // Already connected. Try again. + scanRefresh(); + } + } + else + { + std::cerr << "scanRefresh: next due: " << tpNow << std::endl; + + mScanTimer.expires_at(tpNext); + mScanTimer.async_wait(boost::bind(&ConnectionPool::scanHandler, this, _1)); + } + } +} + #if 0 bool ConnectionPool::isMessageKnown(PackedMessage::pointer msg) { diff --git a/src/ConnectionPool.h b/src/ConnectionPool.h index 521090d94..d30f1a581 100644 --- a/src/ConnectionPool.h +++ b/src/ConnectionPool.h @@ -16,22 +16,25 @@ class ConnectionPool private: boost::mutex mPeerLock; - typedef std::pair naPeer; - - // Count of peers we are in progress of connecting to. - // We are in progress until we know their network public key. - int iConnecting; + typedef std::pair naPeer; // Peers we are connecting with and non-thin peers we are connected to. - boost::unordered_map mIpMap; + boost::unordered_map mIpMap; // Non-thin peers which we are connected to. - boost::unordered_map mConnectedMap; + boost::unordered_map mConnectedMap; boost::asio::ssl::context mCtx; + bool bScanning; + boost::asio::deadline_timer mScanTimer; + std::string mScanIp; + int mScanPort; + + void scanHandler(const boost::system::error_code& ecResult); + public: - ConnectionPool(); + ConnectionPool(boost::asio::io_service& io_service); // Begin enforcing connection policy. void start(); @@ -56,8 +59,20 @@ public: // No longer connected. void peerDisconnected(Peer::pointer peer, const ipPort& ipPeer, const NewcoinAddress& naPeer); + // As client accepted. + void peerVerified(const std::string& strIp, int iPort); + + // As client failed connect and be accepted. + void peerFailed(const std::string& strIp, int iPort); + Json::Value getPeersJson(); + // + // Scanning + // + + void scanRefresh(); + #if 0 //std::vector > mBroadcastMessages; diff --git a/src/Peer.cpp b/src/Peer.cpp index 842bc6d13..85e228643 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -63,6 +63,9 @@ void Peer::detach() // mSocketSsl.close(); if (!mIpPort.first.empty()) { + if (mClientConnect) + theApp->getConnectionPool().peerFailed(mIpPort.first, mIpPort.second); + theApp->getConnectionPool().peerDisconnected(shared_from_this(), mIpPort, mNodePublic); mIpPort.first.clear(); } @@ -97,6 +100,8 @@ void Peer::connect(const std::string strIp, int iPort) { int iPortAct = iPort < 0 ? SYSTEM_PEER_PORT : iPort; + mClientConnect = true; + std::cerr << "Peer::connect: " << strIp << " " << iPort << std::endl; mIpPort = make_pair(strIp, iPort); @@ -156,7 +161,7 @@ void Peer::handleStart(const boost::system::error_code& error) } } -// Connect as client. +// Connect ssl as client. void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it) { if (error) @@ -176,13 +181,15 @@ void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip } } -// Connect as server. +// Connect ssl as server. void Peer::connected(const boost::system::error_code& error) { boost::asio::ip::tcp::endpoint ep = mSocketSsl.lowest_layer().remote_endpoint(); int iPort = ep.port(); std::string strIp = ep.address().to_string(); + mClientConnect = false; + if (iPort == SYSTEM_PEER_PORT) iPort = -1; @@ -502,6 +509,14 @@ void Peer::recvHello(newcoin::TMHello& packet) // Cancel verification timeout. (void) mVerifyTimer.cancel(); + if (mClientConnect) + { + theApp->getConnectionPool().peerVerified(mIpPort.first, mIpPort.second); + + // No longer connecting as client. + mClientConnect = false; + } + // XXX Set timer: connection is in grace period to be useful. // XXX Set timer: connection idle (idle may vary depending on connection type.) diff --git a/src/Peer.h b/src/Peer.h index 1c4240934..f8992840c 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -31,6 +31,7 @@ public: void handleConnect(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it); private: + bool mClientConnect; // In process of connecting as client. NewcoinAddress mNodePublic; // Node public key of peer. ipPort mIpPort; uint256 mCookieHash;