From e6586de0792a71251047cfc96bad4390578f21f9 Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Mon, 30 Apr 2012 13:19:17 -0700 Subject: [PATCH 1/6] Add a MAX to utils. --- src/utils.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/utils.h b/src/utils.h index 02ee67f90..4b036a5b0 100644 --- a/src/utils.h +++ b/src/utils.h @@ -8,6 +8,10 @@ #define nothing() do {} while (0) +#ifndef MAX +#define MAX(x,y) ((x) < (y) ? (y) : (x)) +#endif + boost::posix_time::ptime ptEpoch(); int iToSeconds(boost::posix_time::ptime ptWhen); boost::posix_time::ptime ptFromSeconds(int iSeconds); From 8b40b7ad0036e59333ba646ec03d8d3bee93cac8 Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Mon, 30 Apr 2012 13:20:01 -0700 Subject: [PATCH 2/6] Have RPC connect report if already connected. --- src/RPCServer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/RPCServer.cpp b/src/RPCServer.cpp index 62921c959..f72e25671 100644 --- a/src/RPCServer.cpp +++ b/src/RPCServer.cpp @@ -389,7 +389,7 @@ Json::Value RPCServer::doConnect(Json::Value& params) } if(!theApp->getConnectionPool().connectTo(strIp, iPort)) - return JSONRPCError(500, "Unable to connect"); + return "connected"; return "connecting"; } From 76930ab175ecbe71d79b59cd1777cc83a0efddf6 Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Mon, 30 Apr 2012 13:20:54 -0700 Subject: [PATCH 3/6] Add configuration option peer_scan_interval_min. --- src/Config.cpp | 5 +++++ src/Config.h | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/src/Config.cpp b/src/Config.cpp index 62baef258..fad11cf7f 100644 --- a/src/Config.cpp +++ b/src/Config.cpp @@ -1,6 +1,7 @@ #include "Config.h" #include "ParseSection.h" +#include "utils.h" #include #include @@ -14,6 +15,7 @@ #define SECTION_VALIDATION_PASSWORD "validation_password" #define SECTION_VALIDATION_KEY "validation_key" #define SECTION_PEER_SSL_CIPHER_LIST "peer_ssl_cipher_list" +#define SECTION_PEER_SCAN_INTERVAL_MIN "peer_scan_interval_min" Config theConfig; @@ -36,6 +38,7 @@ Config::Config() DATA_DIR = "db/"; PEER_SSL_CIPHER_LIST = DEFAULT_PEER_SSL_CIPHER_LIST; + PEER_SCAN_INTERVAL_MIN = DEFAULT_PEER_SCAN_INTERVAL_MIN; TRANSACTION_FEE_BASE = 1000; } @@ -78,6 +81,8 @@ void Config::load() (void) sectionSingleB(secConfig, SECTION_VALIDATION_KEY, VALIDATION_KEY); (void) sectionSingleB(secConfig, SECTION_PEER_SSL_CIPHER_LIST, PEER_SSL_CIPHER_LIST); + (void) sectionSingleB(secConfig, SECTION_PEER_SCAN_INTERVAL_MIN, strTemp); + PEER_SCAN_INTERVAL_MIN=MAX(60, boost::lexical_cast(strTemp)); } } diff --git a/src/Config.h b/src/Config.h index 9974d4d94..1fe5ca57a 100644 --- a/src/Config.h +++ b/src/Config.h @@ -10,6 +10,9 @@ const int SYSTEM_PEER_PORT=6561; // Allow anonymous DH. #define DEFAULT_PEER_SSL_CIPHER_LIST "ALL:!LOW:!EXP:!MD5:@STRENGTH" +// 1 hour. +#define DEFAULT_PEER_SCAN_INTERVAL_MIN (60*60) + class Config { public: @@ -46,6 +49,7 @@ public: std::string VALIDATION_KEY; std::string PEER_SSL_CIPHER_LIST; + int PEER_SCAN_INTERVAL_MIN; // configuration parameters std::string DATA_DIR; From 71b5e97ad5ee946d0c83df3adf46899ce3bbb29a Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Mon, 30 Apr 2012 13:21:57 -0700 Subject: [PATCH 4/6] Change SQL tables to suppot scaning. --- src/DBInit.cpp | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/DBInit.cpp b/src/DBInit.cpp index 27304efd7..3df28db3e 100644 --- a/src/DBInit.cpp +++ b/src/DBInit.cpp @@ -56,6 +56,7 @@ const char *WalletDBInit[] = { Comment TEXT \ );", + // Node identity must be persisted for CAS routing and responsibilites. "CREATE TABLE NodeIdentity ( \ PublicKey CHARACTER(53), \ PrivateKey CHARACTER(52), \ @@ -179,7 +180,7 @@ const char *WalletDBInit[] = { PRIMARY KEY (Validator,Entry) \ );", - // Table of IPs to contact the nextwork. + // Table of IPs to contact the network. // IP: // IP address to contact. // Port: @@ -192,17 +193,22 @@ const char *WalletDBInit[] = { // 'M' = Manually added. // 'I' = Inbound connection. // 'O' = Other. - // Contact: - // Time of last contact. - // XXX Update on connect and hourly. + // ScanNext: + // When to next scan. Null=not scanning. + // ScanInterval: + // Delay between scans. "CREATE TABLE PeerIps ( \ IP TEXT NOT NULL, \ Port INTEGER NOT NULL DEFAULT -1, \ Score INTEGER NOT NULL, \ Source CHARACTER(1) NOT NULL, \ - Contact DATETIME, \ - PRIMARY KEY (IP,PORT) \ + ScanNext DATETIME DEFAULT 0, \ + ScanInterval INTEGER NOT NULL DEFAULT 0, \ + PRIMARY KEY (IP,Port) \ );", + + "CREATE INDEX PeerScanIndex ON \ + PeerIps(ScanNext);" }; #if 0 From b341e02ebad52e214e71d96677e9215a79664652 Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Mon, 30 Apr 2012 13:23:54 -0700 Subject: [PATCH 5/6] Initial scaning support --- src/Application.cpp | 1 + src/ConnectionPool.cpp | 164 ++++++++++++++++++++++++++++++++++++++--- src/ConnectionPool.h | 31 ++++++-- src/Peer.cpp | 19 ++++- src/Peer.h | 1 + 5 files changed, 197 insertions(+), 19 deletions(-) diff --git a/src/Application.cpp b/src/Application.cpp index 7c097cb26..01daae4b1 100644 --- a/src/Application.cpp +++ b/src/Application.cpp @@ -44,6 +44,7 @@ DatabaseCon::~DatabaseCon() Application::Application() : mUNL(mIOService), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL), + mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL) { nothing(); diff --git a/src/ConnectionPool.cpp b/src/ConnectionPool.cpp index 22c154f3e..e322cbe40 100644 --- a/src/ConnectionPool.cpp +++ b/src/ConnectionPool.cpp @@ -1,18 +1,19 @@ -#include -#include - #include "ConnectionPool.h" #include "Config.h" #include "Peer.h" #include "Application.h" #include "utils.h" -// XXX On Windows make sure OpenSSL PRNG is seeded: EGADS +#include +#include +#include +#include -ConnectionPool::ConnectionPool() : - iConnecting(0), - mCtx(boost::asio::ssl::context::sslv23) +ConnectionPool::ConnectionPool(boost::asio::io_service& io_service) : + mCtx(boost::asio::ssl::context::sslv23), + bScanning(false), + mScanTimer(io_service) { mCtx.set_options( boost::asio::ssl::context::default_workarounds @@ -26,6 +27,9 @@ ConnectionPool::ConnectionPool() : void ConnectionPool::start() { // XXX Start running policy. + + // Start scanning. + scanRefresh(); } // XXX Broken: also don't send a message to a peer if we got it from the peer. @@ -75,6 +79,7 @@ bool ConnectionPool::peerRegister(Peer::pointer peer, const std::string& strIp, bool ConnectionPool::connectTo(const std::string& strIp, int iPort) { + bool bConnecting; ipPort ip = make_pair(strIp, iPort); boost::unordered_map::iterator it; @@ -94,15 +99,19 @@ bool ConnectionPool::connectTo(const std::string& strIp, int iPort) mIpMap[ip] = peer; peer->connect(strIp, iPort); + + bConnecting = true; } else { // Found it. Already connected. std::cerr << "ConnectionPool::connectTo: Already connected: " << strIp << " " << iPort << std::endl; + + bConnecting = false; } - return true; + return bConnecting; } Json::Value ConnectionPool::getPeersJson() @@ -129,7 +138,7 @@ bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& na) if (na == theApp->getWallet().getNodePublic()) { std::cerr << "ConnectionPool::peerConnected: To self." << std::endl; - bSuccess = false; + bSuccess = false; } else { @@ -183,6 +192,143 @@ void ConnectionPool::peerDisconnected(Peer::pointer peer, const ipPort& ipPeer, } } +void ConnectionPool::peerFailed(const std::string& strIp, int iPort) +{ + if (bScanning && !mScanIp.compare(strIp), mScanPort == iPort) + { + bScanning = false; + scanRefresh(); + } +} + +void ConnectionPool::peerVerified(const std::string& strIp, int iPort) +{ + if (bScanning && !mScanIp.compare(strIp), mScanPort == iPort) + { + // Scan completed successfully. + { + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + Database *db=theApp->getWalletDB()->getDB(); + + db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=NULL,ScanInterval=0 WHERE IP=%s AND Port=%d;") + % db->escape(strIp) + % iPort)); + // XXX Check error. + } + + bScanning = false; + scanRefresh(); + } +} + +void ConnectionPool::scanHandler(const boost::system::error_code& ecResult) +{ + if (ecResult == boost::asio::error::operation_aborted) + { + nothing(); + } + else if (!ecResult) + { + scanRefresh(); + } + else + { + throw std::runtime_error("Internal error: unexpected deadline error."); + } +} + +// Scan ips as per db entries. +void ConnectionPool::scanRefresh() +{ + if (bScanning) + { + // Currently scanning, will scan again after completion. + std::cerr << "scanRefresh: already scanning" << std::endl; + + nothing(); + } + else + { + // Discover if there are entries that need scanning. + boost::posix_time::ptime tpNext; + boost::posix_time::ptime tpNow; + std::string strIp; + int iPort; + int iInterval; + + { + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + Database *db=theApp->getWalletDB()->getDB(); + + if (db->executeSQL("SELECT * FROM PeerIps INDEXED BY PeerScanIndex WHERE ScanNext NOT NULL ORDER BY ScanNext LIMIT 1;") + && db->startIterRows()) + { + // Have an entry to scan. + int iNext = db->getInt("ScanNext"); + + tpNext = ptFromSeconds(iNext); + tpNow = boost::posix_time::second_clock::universal_time(); + + db->getStr("IP", strIp); + iPort = db->getInt("Port"); + iInterval = db->getInt("ScanInterval"); + } + else + { + // No entries to scan. + tpNow = boost::posix_time::ptime(boost::posix_time::not_a_date_time); + } + } + + if (tpNow.is_not_a_date_time()) + { + std::cerr << "scanRefresh: no scan needed." << std::endl; + + (void) mScanTimer.cancel(); + } + else if (tpNext <= tpNow) + { + // Scan it. + (void) mScanTimer.cancel(); + + std::cerr << "scanRefresh: scanning: " << strIp << " " << iPort << std::endl; + bScanning = true; + mScanIp = strIp; + mScanPort = iPort; + + iInterval *= 2; + iInterval = MAX(iInterval, theConfig.PEER_SCAN_INTERVAL_MIN); + + tpNext = tpNow + boost::posix_time::seconds(iInterval); + + { + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + Database *db=theApp->getWalletDB()->getDB(); + + db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IP=%s AND Port=%d;") + % iToSeconds(tpNext) + % iInterval + % db->escape(strIp) + % iPort)); + // XXX Check error. + } + + if (!connectTo(mScanIp, mScanPort)) + { + // Already connected. Try again. + scanRefresh(); + } + } + else + { + std::cerr << "scanRefresh: next due: " << tpNow << std::endl; + + mScanTimer.expires_at(tpNext); + mScanTimer.async_wait(boost::bind(&ConnectionPool::scanHandler, this, _1)); + } + } +} + #if 0 bool ConnectionPool::isMessageKnown(PackedMessage::pointer msg) { diff --git a/src/ConnectionPool.h b/src/ConnectionPool.h index 521090d94..d30f1a581 100644 --- a/src/ConnectionPool.h +++ b/src/ConnectionPool.h @@ -16,22 +16,25 @@ class ConnectionPool private: boost::mutex mPeerLock; - typedef std::pair naPeer; - - // Count of peers we are in progress of connecting to. - // We are in progress until we know their network public key. - int iConnecting; + typedef std::pair naPeer; // Peers we are connecting with and non-thin peers we are connected to. - boost::unordered_map mIpMap; + boost::unordered_map mIpMap; // Non-thin peers which we are connected to. - boost::unordered_map mConnectedMap; + boost::unordered_map mConnectedMap; boost::asio::ssl::context mCtx; + bool bScanning; + boost::asio::deadline_timer mScanTimer; + std::string mScanIp; + int mScanPort; + + void scanHandler(const boost::system::error_code& ecResult); + public: - ConnectionPool(); + ConnectionPool(boost::asio::io_service& io_service); // Begin enforcing connection policy. void start(); @@ -56,8 +59,20 @@ public: // No longer connected. void peerDisconnected(Peer::pointer peer, const ipPort& ipPeer, const NewcoinAddress& naPeer); + // As client accepted. + void peerVerified(const std::string& strIp, int iPort); + + // As client failed connect and be accepted. + void peerFailed(const std::string& strIp, int iPort); + Json::Value getPeersJson(); + // + // Scanning + // + + void scanRefresh(); + #if 0 //std::vector > mBroadcastMessages; diff --git a/src/Peer.cpp b/src/Peer.cpp index 842bc6d13..85e228643 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -63,6 +63,9 @@ void Peer::detach() // mSocketSsl.close(); if (!mIpPort.first.empty()) { + if (mClientConnect) + theApp->getConnectionPool().peerFailed(mIpPort.first, mIpPort.second); + theApp->getConnectionPool().peerDisconnected(shared_from_this(), mIpPort, mNodePublic); mIpPort.first.clear(); } @@ -97,6 +100,8 @@ void Peer::connect(const std::string strIp, int iPort) { int iPortAct = iPort < 0 ? SYSTEM_PEER_PORT : iPort; + mClientConnect = true; + std::cerr << "Peer::connect: " << strIp << " " << iPort << std::endl; mIpPort = make_pair(strIp, iPort); @@ -156,7 +161,7 @@ void Peer::handleStart(const boost::system::error_code& error) } } -// Connect as client. +// Connect ssl as client. void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it) { if (error) @@ -176,13 +181,15 @@ void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip } } -// Connect as server. +// Connect ssl as server. void Peer::connected(const boost::system::error_code& error) { boost::asio::ip::tcp::endpoint ep = mSocketSsl.lowest_layer().remote_endpoint(); int iPort = ep.port(); std::string strIp = ep.address().to_string(); + mClientConnect = false; + if (iPort == SYSTEM_PEER_PORT) iPort = -1; @@ -502,6 +509,14 @@ void Peer::recvHello(newcoin::TMHello& packet) // Cancel verification timeout. (void) mVerifyTimer.cancel(); + if (mClientConnect) + { + theApp->getConnectionPool().peerVerified(mIpPort.first, mIpPort.second); + + // No longer connecting as client. + mClientConnect = false; + } + // XXX Set timer: connection is in grace period to be useful. // XXX Set timer: connection idle (idle may vary depending on connection type.) diff --git a/src/Peer.h b/src/Peer.h index 1c4240934..f8992840c 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -31,6 +31,7 @@ public: void handleConnect(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it); private: + bool mClientConnect; // In process of connecting as client. NewcoinAddress mNodePublic; // Node public key of peer. ipPort mIpPort; uint256 mCookieHash; From 39606523f70616fb172ed10f1f544fe4f9e683a9 Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Mon, 30 Apr 2012 13:24:16 -0700 Subject: [PATCH 6/6] Refresh scaning after scoring. --- src/UniqueNodeList.cpp | 7 +++++-- src/Wallet.cpp | 1 - 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/UniqueNodeList.cpp b/src/UniqueNodeList.cpp index afa17c2fa..a6e1a2274 100644 --- a/src/UniqueNodeList.cpp +++ b/src/UniqueNodeList.cpp @@ -449,6 +449,9 @@ void UniqueNodeList::scoreTimerHandler(const boost::system::error_code& err) // Score again if needed. scoreNext(false); + + // Scan may be dirty due to new ips. + theApp->getConnectionPool().scanRefresh(); } } @@ -923,14 +926,14 @@ void UniqueNodeList::fetchNext() if (!bFull) { // Determine next scan. - std::string strDomain; + std::string strDomain; boost::posix_time::ptime tpNext; boost::posix_time::ptime tpNow; ScopedLock sl(theApp->getWalletDB()->getDBLock()); Database *db=theApp->getWalletDB()->getDB(); - if (db->executeSQL("SELECT Domain,Next FROM SeedDomains ORDER BY Next LIMIT 1;") + if (db->executeSQL("SELECT Domain,Next FROM SeedDomains INDEXED BY SeedDomainNext ORDER BY Next LIMIT 1;") && db->startIterRows()) { int iNext = db->getInt("Next"); diff --git a/src/Wallet.cpp b/src/Wallet.cpp index 5ed60a168..426e84700 100644 --- a/src/Wallet.cpp +++ b/src/Wallet.cpp @@ -371,7 +371,6 @@ bool Wallet::nodeIdentityLoad() ScopedLock sl(theApp->getWalletDB()->getDBLock()); bool bSuccess = false; - if(db->executeSQL("SELECT * FROM NodeIdentity;") && db->startIterRows()) { std::string strPublicKey, strPrivateKey;