diff --git a/src/Config.cpp b/src/Config.cpp index be65c19846..e615520535 100644 --- a/src/Config.cpp +++ b/src/Config.cpp @@ -205,7 +205,9 @@ void Config::load() VALIDATION_SEED.setSeedGeneric(strTemp); (void) sectionSingleB(secConfig, SECTION_PEER_SSL_CIPHER_LIST, PEER_SSL_CIPHER_LIST); + if (sectionSingleB(secConfig, SECTION_PEER_SCAN_INTERVAL_MIN, strTemp)) + // Minimum for min is 60 seconds. PEER_SCAN_INTERVAL_MIN = MAX(60, boost::lexical_cast(strTemp)); if (sectionSingleB(secConfig, SECTION_PEER_START_MAX, strTemp)) diff --git a/src/Config.h b/src/Config.h index 452e652988..fecbfcf9a2 100644 --- a/src/Config.h +++ b/src/Config.h @@ -27,8 +27,10 @@ const int SYSTEM_PEER_PORT = 6561; // Allow anonymous DH. #define DEFAULT_PEER_SSL_CIPHER_LIST "ALL:!LOW:!EXP:!MD5:@STRENGTH" -// 1 hour. -#define DEFAULT_PEER_SCAN_INTERVAL_MIN (60*60) +// Normal, recommend 1 hour. +// #define DEFAULT_PEER_SCAN_INTERVAL_MIN (60*60) +// Testing, recommend 1 minute. +#define DEFAULT_PEER_SCAN_INTERVAL_MIN (60) // Maximum number of peers to try to connect to as client at once. #define DEFAULT_PEER_START_MAX 5 diff --git a/src/ConnectionPool.cpp b/src/ConnectionPool.cpp index 6e7302586b..12373a0da6 100644 --- a/src/ConnectionPool.cpp +++ b/src/ConnectionPool.cpp @@ -52,13 +52,24 @@ void ConnectionPool::start() bool ConnectionPool::getTopNAddrs(int n,std::vector& addrs) { - Database* db = theApp->getWalletDB()->getDB(); - ScopedLock sl(theApp->getWalletDB()->getDBLock()); - SQL_FOREACH(db, str(boost::format("SELECT IpPort FROM PeerIps limit %d") % n) ) + // XXX Filter out other local addresses (like ipv6) + if (!theConfig.PEER_IP.empty() && theConfig.PEER_IP != "127.0.0.1") { - std::string str; - db->getStr(0,str); - addrs.push_back(str); + addrs.push_back(str(boost::format("%s %d") % theConfig.PEER_IP % theConfig.PEER_PORT)); + } + + { + Database* db = theApp->getWalletDB()->getDB(); + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + + SQL_FOREACH(db, str(boost::format("SELECT IpPort FROM PeerIps LIMIT %d") % n) ) + { + std::string str; + + db->getStr(0,str); + + addrs.push_back(str); + } } return true; @@ -68,15 +79,15 @@ bool ConnectionPool::savePeer(const std::string& strIp, int iPort,char code) { Database* db = theApp->getWalletDB()->getDB(); - std::string ipPort=db->escape(str(boost::format("%s %d") % strIp % iPort)); + std::string ipPort= sqlEscape(str(boost::format("%s %d") % strIp % iPort)); ScopedLock sl(theApp->getWalletDB()->getDBLock()); - std::string sql=str(boost::format("SELECT count(*) FROM PeerIps WHERE IpPort=%s;") % ipPort); + std::string sql=str(boost::format("SELECT COUNT(*) FROM PeerIps WHERE IpPort=%s;") % ipPort); if (db->executeSQL(sql) && db->startIterRows()) { if ( db->getInt(0)==0) { - db->executeSQL(str(boost::format("INSERT INTO PeerIps (IpPort,Score,Source) values (%s,0,'%c');") % ipPort % code)); + db->executeSQL(str(boost::format("INSERT INTO PeerIps (IpPort,Score,Source) values (%s,0,'%c');") % ipPort % code)); return true; }// else we already had this peer } @@ -154,7 +165,7 @@ void ConnectionPool::policyLowWater() else { // Try to start connection. - if (!connectTo(strIp, iPort)) + if (!peerConnect(strIp, iPort)) throw std::runtime_error("Internal error: standby was already connected."); // Check if we need more. @@ -201,7 +212,6 @@ void ConnectionPool::policyHandler(const boost::system::error_code& ecResult) } } -// XXX Broken: also don't send a message to a peer if we got it from the peer. void ConnectionPool::relayMessage(Peer* fromPeer, PackedMessage::pointer msg) { boost::mutex::scoped_lock sl(mPeerLock); @@ -216,40 +226,27 @@ void ConnectionPool::relayMessage(Peer* fromPeer, PackedMessage::pointer msg) } } -// Inbound connection, false=reject -// Reject addresses we already have in our table. -// XXX Reject, if we have too many connections. -bool ConnectionPool::peerRegister(Peer::pointer peer, const std::string& strIp, int iPort) +// Add or modify into PeerIps as a manual entry for immediate scanning. +// Requires sane IP and port. +void ConnectionPool::connectTo(const std::string& strIp, int iPort) { - bool bAccept; - ipPort ip = make_pair(strIp, iPort); + Database* db = theApp->getWalletDB()->getDB(); + std::string ipPort = sqlEscape(str(boost::format("%s %d") % strIp % iPort)); - boost::unordered_map::iterator it; - - boost::mutex::scoped_lock sl(mPeerLock); - - it = mIpMap.find(ip); - - if (it == mIpMap.end()) { - // Did not find it. Not already connecting or connected. + ScopedLock sl(theApp->getWalletDB()->getDBLock()); - std::cerr << "ConnectionPool::peerRegister: " << ip.first << " " << ip.second << std::endl; - // Mark as connecting. - mIpMap[ip] = peer; - bAccept = true; - } - else - { - // Found it. Already connected or connecting. - - bAccept = false; + db->executeSQL(str(boost::format("REPLACE INTO PeerIps (IpPort,Score,Source,ScanNext) values (%s,%d,'%c',0);") + % ipPort + % theApp->getUNL().iSourceScore(UniqueNodeList::vsManual) + % char(UniqueNodeList::vsManual))); } - return bAccept; + scanRefresh(); } -bool ConnectionPool::connectTo(const std::string& strIp, int iPort) +// <-- true, if already connected. +bool ConnectionPool::peerConnect(const std::string& strIp, int iPort) { bool bConnecting; ipPort ip = make_pair(strIp, iPort); @@ -263,7 +260,7 @@ bool ConnectionPool::connectTo(const std::string& strIp, int iPort) if (it == mIpMap.end()) { // Did not find it. Not already connecting or connected. - std::cerr << "ConnectionPool::connectTo: Connecting: " + std::cerr << "ConnectionPool::peerConnect: Connecting: " << strIp << " " << iPort << std::endl; Peer::pointer peer(Peer::create(theApp->getIOService(), mCtx)); @@ -279,7 +276,7 @@ bool ConnectionPool::connectTo(const std::string& strIp, int iPort) else { // Found it. Already connected. - std::cerr << "ConnectionPool::connectTo: Already connected: " + std::cerr << "ConnectionPool::peerConnect: Already connected: " << strIp << " " << iPort << std::endl; bConnecting = false; @@ -319,15 +316,15 @@ std::vector ConnectionPool::getPeerVector() } // Now know peer's node public key. Determine if we want to stay connected. -bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& na) +bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& naPeer, const std::string& strIP, int iPort) { bool bSuccess; - std::cerr << "ConnectionPool::peerConnected: " << na.humanNodePublic() - << " " << peer->getIP() << " " << peer->getPort() - << std::endl; + std::cerr << "ConnectionPool::peerConnected: " + << naPeer.humanNodePublic() << " " << strIP << " " << iPort << std::endl; assert(!!peer); - if (na == theApp->getWallet().getNodePublic()) + + if (naPeer == theApp->getWallet().getNodePublic()) { std::cerr << "ConnectionPool::peerConnected: To self." << std::endl; bSuccess = false; @@ -335,24 +332,43 @@ bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& na) else { boost::mutex::scoped_lock sl(mPeerLock); + boost::unordered_map::iterator itCm = mConnectedMap.find(naPeer); - mConnectedMap[na] = peer; - bSuccess = true; + if (itCm == mConnectedMap.end()) + { + // New connection. + mConnectedMap[naPeer] = peer; + bSuccess = true; + } + else + { + // Found in map, already connected. + if (!strIP.empty()) + { + // Was an outbound connection, we know IP and port. + // Note in previous connection how to reconnect. + itCm->second->peerIpPort(strIP, iPort); + } + + bSuccess = false; // Don't need a redundant connection. + } } - return bSuccess; } -void ConnectionPool::peerDisconnected(Peer::pointer peer, const ipPort& ipPeer, const NewcoinAddress& naPeer) +// We maintain a map of public key to peer for connectted and verified peers. Maintain it. +void ConnectionPool::peerDisconnected(Peer::pointer peer, const NewcoinAddress& naPeer) { - std::cerr << "ConnectionPool::peerDisconnected: " << ipPeer.first << " " << ipPeer.second << std::endl; - - boost::mutex::scoped_lock sl(mPeerLock); + std::cerr << "ConnectionPool::peerDisconnected: " << peer->getIP() << " " << peer->getPort() << std::endl; if (naPeer.isValid()) { - boost::unordered_map::iterator itCm = mConnectedMap.find(naPeer); + boost::unordered_map::iterator itCm; + + boost::mutex::scoped_lock sl(mPeerLock); + + itCm = mConnectedMap.find(naPeer); if (itCm == mConnectedMap.end()) { @@ -366,25 +382,53 @@ void ConnectionPool::peerDisconnected(Peer::pointer peer, const ipPort& ipPeer, mConnectedMap.erase(itCm); } } +} - boost::unordered_map::iterator itIp = mIpMap.find(ipPeer); +void ConnectionPool::peerScanSet(const std::string& strIp, int iPort) +{ + std::cerr << "ConnectionPool::peerScanSet: " << strIp << " " << iPort << std::endl; - if (itIp == mIpMap.end()) + std::string strIpPort = str(boost::format("%s %d") % strIp % iPort); + + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + Database* db = theApp->getWalletDB()->getDB(); + + if (db->executeSQL(str(boost::format("SELECT ScanNext FROM PeerIps WHERE IpPort=%s;") + % sqlEscape(strIpPort))) + && db->startIterRows()) { - // Did not find it. Not already connecting or connected. - std::cerr << "Internal Error: peer wasn't connected: " - << ipPeer.first << " " << ipPeer.second << std::endl; - // XXX Bad error. + if (db->getNull("ScanNext")) + { + // Non-scanning connection terminated. Schedule for scanning. + int iInterval = theConfig.PEER_SCAN_INTERVAL_MIN; + boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time(); + boost::posix_time::ptime tpNext = tpNow + boost::posix_time::seconds(iInterval); + + std::cerr << str(boost::format("peerScanSet: scan schedule: %s %s (next %s, delay=%s)") + % mScanIp % mScanPort % tpNext % iInterval) << std::endl; + + db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IpPort=%s;") + % iToSeconds(tpNext) + % iInterval + % db->escape(strIpPort))); + } + else + { + // Scanning connection terminate, already scheduled for retry. + nothing(); + } } else { - // Found it. Delete it. - mIpMap.erase(itIp); + std::cerr << "peerScanSet: peer wasn't in PeerIps: " << strIp << " " << iPort << std::endl; } } void ConnectionPool::peerFailed(const std::string& strIp, int iPort) { + std::cerr << "ConnectionPool::peerFailed: " << strIp << " " << iPort << std::endl; + ipPort ipPeer = make_pair(strIp, iPort); + // If the fail was our scan, we are no longer scanning. if (bScanning && !mScanIp.compare(strIp) && mScanPort == iPort) { @@ -393,6 +437,36 @@ void ConnectionPool::peerFailed(const std::string& strIp, int iPort) // Look for more to scan. scanRefresh(); } + + bool bScanSet = false; + + { + boost::mutex::scoped_lock sl(mPeerLock); + boost::unordered_map::iterator itIp; + + itIp = mIpMap.find(ipPeer); + + if (itIp == mIpMap.end()) + { + // Did not find it. Not already connecting or connected. + std::cerr << "Internal Error: peer wasn't connected: " + << ipPeer.first << " " << ipPeer.second << std::endl; + // XXX Bad error. + } + else + { + // Found it. Delete it. + mIpMap.erase(itIp); + + bScanSet = true; + } + } + + if (bScanSet) + { + // Schedule for scanning. + peerScanSet(ipPeer.first, ipPeer.second); + } } void ConnectionPool::peerVerified(const std::string& strIp, int iPort) @@ -400,6 +474,9 @@ void ConnectionPool::peerVerified(const std::string& strIp, int iPort) if (bScanning && !mScanIp.compare(strIp), mScanPort == iPort) { std::string strIpPort = str(boost::format("%s %d") % strIp % iPort); + + std::cerr << str(boost::format("peerVerified: %s %s (scan off)") % mScanIp % mScanPort) << std::endl; + // Scan completed successfully. { ScopedLock sl(theApp->getWalletDB()->getDBLock()); @@ -485,7 +562,6 @@ void ConnectionPool::scanRefresh() (void) mScanTimer.cancel(); - std::cerr << "scanRefresh: scanning: " << mScanIp << " " << mScanPort << std::endl; bScanning = true; iInterval *= 2; @@ -493,6 +569,9 @@ void ConnectionPool::scanRefresh() tpNext = tpNow + boost::posix_time::seconds(iInterval); + std::cerr << str(boost::format("scanRefresh: now scanning: %s %s (next %s, delay=%s)") + % mScanIp % mScanPort % tpNext % iInterval) << std::endl; + { ScopedLock sl(theApp->getWalletDB()->getDBLock()); Database *db=theApp->getWalletDB()->getDB(); @@ -504,7 +583,7 @@ void ConnectionPool::scanRefresh() // XXX Check error. } - if (!connectTo(mScanIp, mScanPort)) + if (!peerConnect(mScanIp, mScanPort)) { // Already connected. Try again. scanRefresh(); diff --git a/src/ConnectionPool.h b/src/ConnectionPool.h index 656bbf1b94..41a7182ea2 100644 --- a/src/ConnectionPool.h +++ b/src/ConnectionPool.h @@ -20,9 +20,14 @@ private: typedef std::pair pipPeer; // Peers we are connecting with and non-thin peers we are connected to. + // Only peers we know the connection ip for are listed. + // We know the ip and port for: + // - All outbound connections + // - Some inbound connections (which we figured out). boost::unordered_map mIpMap; // Non-thin peers which we are connected to. + // Peers we have the public key for. boost::unordered_map mConnectedMap; boost::asio::ssl::context mCtx; @@ -41,7 +46,10 @@ private: // Peers we are establishing a connection with as a client. // int miConnectStarting; - bool peerAvailable(std::string& strIp, int& iPort); + bool peerAvailable(std::string& strIp, int& iPort); + void peerScanSet(const std::string& strIp, int iPort); + + bool peerConnect(const std::string& strIp, int iPort); public: ConnectionPool(boost::asio::io_service& io_service); @@ -54,7 +62,7 @@ public: // Manual connection request. // Queue for immediate scanning. - bool connectTo(const std::string& strIp, int iPort); + void connectTo(const std::string& strIp, int iPort); // // Peer connectivity notification. @@ -62,14 +70,12 @@ public: bool getTopNAddrs(int n,std::vector& addrs); bool savePeer(const std::string& strIp, int iPort, char code); - // Inbound connection, false=reject - bool peerRegister(Peer::pointer peer, const std::string& strIp, int iPort); - - // We know peers node public key. false=reject - bool peerConnected(Peer::pointer peer, const NewcoinAddress& na); + // We know peers node public key. + // <-- bool: false=reject + bool peerConnected(Peer::pointer peer, const NewcoinAddress& naPeer, const std::string& strIP, int iPort); // No longer connected. - void peerDisconnected(Peer::pointer peer, const ipPort& ipPeer, const NewcoinAddress& naPeer); + void peerDisconnected(Peer::pointer peer, const NewcoinAddress& naPeer); // As client accepted. void peerVerified(const std::string& strIp, int iPort); diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index bf6c086b40..b749a76bc0 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -251,11 +251,8 @@ void NetworkOPs::setStateTimer(int sec) uint64 consensusTime = mLedgerMaster->getCurrentLedger()->getCloseTimeNC() - LEDGER_WOBBLE_TIME; uint64 now = getNetworkTimeNC(); - if ((mMode == omFULL) && !mConsensus) - { - if (now >= consensusTime) sec = 0; - else if (sec > (consensusTime - now)) sec = (consensusTime - now); - } + if (now >= consensusTime) sec = 0; + else if (sec > (consensusTime - now)) sec = (consensusTime - now); } mNetTimer.expires_from_now(boost::posix_time::seconds(sec)); mNetTimer.async_wait(boost::bind(&NetworkOPs::checkState, this, boost::asio::placeholders::error)); @@ -429,19 +426,13 @@ void NetworkOPs::checkState(const boost::system::error_code& result) // check if the ledger is bad enough to go to omTRACKING } - if (mMode != omFULL) - { - setStateTimer(4); - return; - } - int secondsToClose = theApp->getMasterLedger().getCurrentLedger()->getCloseTimeNC() - theApp->getOPs().getNetworkTimeNC(); if ((!mConsensus) && (secondsToClose < LEDGER_WOBBLE_TIME)) // pre close wobble beginConsensus(theApp->getMasterLedger().getCurrentLedger()); if (mConsensus) setStateTimer(mConsensus->timerEntry()); - else setStateTimer(10); + else setStateTimer(4); } void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger) diff --git a/src/Peer.cpp b/src/Peer.cpp index df3ea8fa78..f90b2935a4 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -60,22 +60,32 @@ void Peer::handle_write(const boost::system::error_code& error, size_t bytes_tra void Peer::detach(const char *rsn) { #ifdef DEBUG - Log(lsTRACE) << "DETACHING PEER: " << rsn; + Log(lsTRACE) << "DETACHING PEER: " << rsn + << ": " + << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") + << " " << getIP() << " " << getPort() << std::endl; #endif + boost::system::error_code ecCancel; (void) mVerifyTimer.cancel(); mSendQ.clear(); + if (mNodePublic.isValid()) + { + theApp->getConnectionPool().peerDisconnected(shared_from_this(), mNodePublic); + + mNodePublic.clear(); // Be idompotent. + } + if (!mIpPort.first.empty()) { - if (mClientConnect) - // Connection might be part of scanning. Inform connect failed. - theApp->getConnectionPool().peerFailed(mIpPort.first, mIpPort.second); + // Connection might be part of scanning. Inform connect failed. + // Might need to scan. Inform connection disconnected. + theApp->getConnectionPool().peerFailed(mIpPort.first, mIpPort.second); - theApp->getConnectionPool().peerDisconnected(shared_from_this(), mIpPort, mNodePublic); - mIpPort.first.clear(); + mIpPort.first.empty(); // Be idompotent. } } @@ -191,7 +201,8 @@ void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip } } -// Connect ssl as server. +// Connect ssl as server to an inbound connection. +// - We don't bother remembering the inbound IP or port. Only useful for debugging. void Peer::connected(const boost::system::error_code& error) { boost::asio::ip::tcp::endpoint ep = mSocketSsl.lowest_layer().remote_endpoint(); @@ -208,21 +219,12 @@ void Peer::connected(const boost::system::error_code& error) std::cerr << "Remote peer: accept error: " << strIp << " " << iPort << " : " << error << std::endl; detach("ctd"); } - else if (!theApp->getConnectionPool().peerRegister(shared_from_this(), strIp, iPort)) - { - std::cerr << "Remote peer: rejecting: " << strIp << " " << iPort << std::endl; - // XXX Reject with a rejection message: already connected - detach("ctd2"); - } else { // Not redundant ip and port, add to connection list. std::cerr << "Remote peer: accepted: " << strIp << " " << iPort << std::endl; - mIpPort = make_pair(strIp, iPort); - assert(!mIpPort.first.empty()); - mSocketSsl.set_verify_mode(boost::asio::ssl::verify_none); mSocketSsl.async_handshake(boost::asio::ssl::stream::server, @@ -241,9 +243,9 @@ void Peer::sendPacketForce(PackedMessage::pointer packet) void Peer::sendPacket(PackedMessage::pointer packet) { - if(packet) + if (packet) { - if(mSendingPacket) + if (mSendingPacket) { mSendQ.push_back(packet); } @@ -279,7 +281,7 @@ void Peer::handle_read_header(const boost::system::error_code& error) { unsigned msg_len = PackedMessage::getLength(mReadbuf); // WRITEME: Compare to maximum message length, abort if too large - if(msg_len>(32*1024*1024)) + if (msg_len>(32*1024*1024)) { detach("hrh"); return; @@ -307,7 +309,6 @@ void Peer::handle_read_body(const boost::system::error_code& error) } } - void Peer::processReadBuffer() { int type = PackedMessage::getType(mReadbuf); @@ -330,7 +331,7 @@ void Peer::processReadBuffer() case newcoin::mtHELLO: { newcoin::TMHello msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvHello(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -339,7 +340,7 @@ void Peer::processReadBuffer() case newcoin::mtERROR_MSG: { newcoin::TMErrorMsg msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvErrorMessage(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -348,7 +349,7 @@ void Peer::processReadBuffer() case newcoin::mtPING: { newcoin::TMPing msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvPing(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -357,7 +358,7 @@ void Peer::processReadBuffer() case newcoin::mtGET_CONTACTS: { newcoin::TMGetContacts msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvGetContacts(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -366,7 +367,7 @@ void Peer::processReadBuffer() case newcoin::mtCONTACT: { newcoin::TMContact msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvContact(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -374,7 +375,7 @@ void Peer::processReadBuffer() case newcoin::mtGET_PEERS: { newcoin::TMGetPeers msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvGetPeers(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -382,7 +383,7 @@ void Peer::processReadBuffer() case newcoin::mtPEERS: { newcoin::TMPeers msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvPeers(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -391,7 +392,7 @@ void Peer::processReadBuffer() case newcoin::mtSEARCH_TRANSACTION: { newcoin::TMSearchTransaction msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvSearchTransaction(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -400,7 +401,7 @@ void Peer::processReadBuffer() case newcoin::mtGET_ACCOUNT: { newcoin::TMGetAccount msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvGetAccount(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -409,7 +410,7 @@ void Peer::processReadBuffer() case newcoin::mtACCOUNT: { newcoin::TMAccount msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvAccount(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -418,7 +419,7 @@ void Peer::processReadBuffer() case newcoin::mtTRANSACTION: { newcoin::TMTransaction msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvTransaction(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -427,7 +428,7 @@ void Peer::processReadBuffer() case newcoin::mtSTATUS_CHANGE: { newcoin::TMStatusChange msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvStatus(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -436,7 +437,7 @@ void Peer::processReadBuffer() case newcoin::mtPROPOSE_LEDGER: { newcoin::TMProposeSet msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvPropose(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -445,7 +446,7 @@ void Peer::processReadBuffer() case newcoin::mtGET_LEDGER: { newcoin::TMGetLedger msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvGetLedger(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -454,7 +455,7 @@ void Peer::processReadBuffer() case newcoin::mtLEDGER_DATA: { newcoin::TMLedgerData msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvLedger(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -463,7 +464,7 @@ void Peer::processReadBuffer() case newcoin::mtHAVE_SET: { newcoin::TMHaveTransactionSet msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvHaveTxSet(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -472,7 +473,7 @@ void Peer::processReadBuffer() case newcoin::mtVALIDATION: { newcoin::TMValidation msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvValidation(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -481,7 +482,7 @@ void Peer::processReadBuffer() case newcoin::mtGET_VALIDATION: { newcoin::TM msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recv(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -491,7 +492,7 @@ void Peer::processReadBuffer() case newcoin::mtGET_OBJECT: { newcoin::TMGetObjectByHash msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvGetObjectByHash(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -500,7 +501,7 @@ void Peer::processReadBuffer() case newcoin::mtOBJECT: { newcoin::TMObjectByHash msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvObjectByHash(msg); else std::cerr << "parse error: " << type << std::endl; } @@ -530,18 +531,20 @@ void Peer::recvHello(newcoin::TMHello& packet) { // Unable to verify they have private key for claimed public key. std::cerr << "Recv(Hello): Disconnect: Failed to verify session." << std::endl; } - else if (!theApp->getConnectionPool().peerConnected(shared_from_this(), mNodePublic)) + else if (!theApp->getConnectionPool().peerConnected(shared_from_this(), mNodePublic, getIP(), getPort())) { // Already connected, self, or some other reason. std::cerr << "Recv(Hello): Disconnect: Extraneous connection." << std::endl; } else { // Successful connection. + std::cerr << "Recv(Hello): Connect: " << mNodePublic.humanNodePublic() << std::endl; // Cancel verification timeout. (void) mVerifyTimer.cancel(); if (mClientConnect) { + // If we connected due to scan, no longer need to scan. theApp->getConnectionPool().peerVerified(mIpPort.first, mIpPort.second); // No longer connecting as client. @@ -569,9 +572,6 @@ void Peer::recvHello(newcoin::TMHello& packet) mClosedLedgerTime = boost::posix_time::second_clock::universal_time(); } - - theApp->getConnectionPool().savePeer(getIP(),packet.ipv4port(),'I'); - bDetach = false; } @@ -713,8 +713,10 @@ void Peer::recvGetContacts(newcoin::TMGetContacts& packet) void Peer::recvGetPeers(newcoin::TMGetPeers& packet) { std::vector addrs; + theApp->getConnectionPool().getTopNAddrs(30,addrs); - if(addrs.size()) + + if (addrs.size()) { newcoin::TMPeers peers; @@ -722,7 +724,9 @@ void Peer::recvGetPeers(newcoin::TMGetPeers& packet) { std::string strIP; int port; - splitIpPort(addrs[n],strIP,port); + + splitIpPort(addrs[n], strIP, port); + newcoin::TMIPv4EndPoint* addr=peers.add_nodes(); addr->set_ipv4(inet_addr(strIP.c_str())); addr->set_ipv4port(port); @@ -735,21 +739,29 @@ void Peer::recvGetPeers(newcoin::TMGetPeers& packet) sendPacket(message); } } + // TODO: filter out all the LAN peers void Peer::recvPeers(newcoin::TMPeers& packet) { - for(int i = 0; i < packet.nodes().size(); ++i) + for (int i = 0; i < packet.nodes().size(); ++i) { in_addr addr; addr.s_addr=packet.nodes(i).ipv4(); - std::string strIP( inet_ntoa(addr)); + std::string strIP(inet_ntoa(addr)); int port=packet.nodes(i).ipv4port(); - std::cout << "Learning about: " << strIP << std::endl; + if (strIP == "0.0.0.0") + { + strIP = mSocketSsl.lowest_layer().remote_endpoint().address().to_string(); + } - theApp->getConnectionPool().savePeer(strIP,port,'T'); + // if (strIP != "127.0.0.1") + { + std::cout << "Learning about: " << strIP << std::endl; + + theApp->getConnectionPool().savePeer(strIP, port, 'T'); + } } - } void Peer::recvIndexedObject(newcoin::TMIndexedObject& packet) diff --git a/src/Peer.h b/src/Peer.h index d458b05886..99c0204af2 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -96,8 +96,10 @@ public: //bool operator == (const Peer& other); - std::string& getIP(){ return(mIpPort.first); } - int getPort(){ return(mIpPort.second); } + std::string& getIP() { return mIpPort.first; } + int getPort() { return mIpPort.second; } + + void peerIpPort(const std::string& strIP, int iPort) { mIpPort = make_pair(strIP, iPort); } static pointer create(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) { diff --git a/src/RPCServer.cpp b/src/RPCServer.cpp index 46dcb53b2f..c215d10f3e 100644 --- a/src/RPCServer.cpp +++ b/src/RPCServer.cpp @@ -723,8 +723,8 @@ Json::Value RPCServer::doConnect(Json::Value& params) iPort = boost::lexical_cast(strPort); } - if (!theApp->getConnectionPool().connectTo(strIp, iPort)) - return "connected"; + // XXX Validate legal IP and port + theApp->getConnectionPool().connectTo(strIp, iPort); return "connecting"; } diff --git a/src/UniqueNodeList.cpp b/src/UniqueNodeList.cpp index c49bfc70e7..86b47ea1a9 100644 --- a/src/UniqueNodeList.cpp +++ b/src/UniqueNodeList.cpp @@ -904,7 +904,7 @@ void UniqueNodeList::responseFetch(const std::string strDomain, const boost::sys else { std::cerr - << boost::format("Validator: '%s' unabile to retrieve " NODE_FILE_NAME ": %s") + << boost::format("Validator: '%s' unable to retrieve " NODE_FILE_NAME ": %s") % strDomain % err.message() << std::endl; diff --git a/src/UniqueNodeList.h b/src/UniqueNodeList.h index 2149c313fe..0e5e72bde8 100644 --- a/src/UniqueNodeList.h +++ b/src/UniqueNodeList.h @@ -92,7 +92,6 @@ private: void trustedLoad(); bool scoreRound(std::vector& vsnNodes); - int iSourceScore(validatorSource vsWhy); void responseFetch(const std::string strDomain, const boost::system::error_code& err, const std::string strSiteFile); @@ -156,6 +155,8 @@ public: void nodeNetwork(); Json::Value getUnlJson(); + + int iSourceScore(validatorSource vsWhy); }; #endif