From f7b5753be1eed072b78bf9cdc322b97ede1f6f2f Mon Sep 17 00:00:00 2001 From: jed Date: Wed, 13 Jun 2012 08:20:08 -0700 Subject: [PATCH] maintain db of peers --- src/Application.cpp | 4 +-- src/ConnectionPool.cpp | 43 ++++++++++++++++++++++++- src/ConnectionPool.h | 4 +++ src/Peer.cpp | 73 ++++++++++++++++++++++++++++++++++++++++-- src/Peer.h | 8 +++++ src/UniqueNodeList.cpp | 8 ++--- src/newcoin.proto | 15 +++++++++ src/utils.cpp | 17 ++++++++++ src/utils.h | 5 +++ 9 files changed, 167 insertions(+), 10 deletions(-) diff --git a/src/Application.cpp b/src/Application.cpp index 7ad1c870e..241a4b878 100644 --- a/src/Application.cpp +++ b/src/Application.cpp @@ -13,7 +13,7 @@ #include "key.h" #include "utils.h" #include "TaggedCache.h" - +#include "boost/filesystem.hpp" Application* theApp = NULL; @@ -21,7 +21,7 @@ DatabaseCon::DatabaseCon(const std::string& strName, const char *initStrings[], { boost::filesystem::path pPath = theConfig.DATA_DIR / strName; - mDatabase = new SqliteDatabase(pPath.c_str()); + mDatabase = new SqliteDatabase((const char*) pPath.c_str()); mDatabase->connect(); for(int i = 0; i < initCount; ++i) mDatabase->executeSQL(initStrings[i], true); diff --git a/src/ConnectionPool.cpp b/src/ConnectionPool.cpp index 74c5c0019..398a9bcec 100644 --- a/src/ConnectionPool.cpp +++ b/src/ConnectionPool.cpp @@ -11,7 +11,11 @@ #include #include -static void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort) +#define SQL_FOREACH(_db, _strQuery) \ + if ((_db)->executeSQL(_strQuery)) \ + for (bool _bMore = (db)->startIterRows(); _bMore; _bMore = (_db)->getNextRow()) + +void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort) { std::vector vIpPort; boost::split(vIpPort, strIpPort, boost::is_any_of(" ")); @@ -43,6 +47,40 @@ void ConnectionPool::start() scanRefresh(); } +bool ConnectionPool::getTopNAddrs(int n,std::vector& addrs) +{ + Database* db = theApp->getWalletDB()->getDB(); + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + SQL_FOREACH(db, str(boost::format("SELECT IpPort FROM PeerIps limit %d") % n) ) + { + std::string str; + db->getStr(0,str); + addrs.push_back(str); + } + + return true; +} + +bool ConnectionPool::savePeer(const std::string& strIp, int iPort) +{ + Database* db = theApp->getWalletDB()->getDB(); + + std::string ipPort=db->escape(str(boost::format("%s %d") % strIp % iPort)); + + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + std::string sql=str(boost::format("SELECT count(*) FROM PeerIps WHERE IpPort=%s;") % ipPort); + if(db->executeSQL(sql) && db->startIterRows()) + { + if( db->getInt(0)==0) + { + db->executeSQL(str(boost::format("INSERT INTO PeerIps (IpPort,Score,Source) values (%s,0,'a');") % ipPort)); + return true; + }// else we already had this peer + }else std::cout << "Error saving Peer" << std::endl; + + return false; +} + bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort) { Database* db = theApp->getWalletDB()->getDB(); @@ -247,8 +285,11 @@ bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& na) { mConnectedMap[na] = peer; bSuccess = true; + + savePeer(peer->getIP(),peer->getPort()); } + return bSuccess; } diff --git a/src/ConnectionPool.h b/src/ConnectionPool.h index 7d5f4a986..7516e1957 100644 --- a/src/ConnectionPool.h +++ b/src/ConnectionPool.h @@ -55,6 +55,8 @@ public: // // Peer connectivity notification. // + bool getTopNAddrs(int n,std::vector& addrs); + bool savePeer(const std::string& strIp, int iPort); // Inbound connection, false=reject bool peerRegister(Peer::pointer peer, const std::string& strIp, int iPort); @@ -93,5 +95,7 @@ public: #endif }; +extern void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort); + #endif // vim:ts=4 diff --git a/src/Peer.cpp b/src/Peer.cpp index e4ef4938b..3a7507e49 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -155,7 +155,7 @@ void Peer::connect(const std::string strIp, int iPort) } } -// We have an ecrypted connection to the peer. +// We have an encrypted connection to the peer. // Have it say who it is so we know to avoid redundant connections. // Establish that it really who we are talking to by having it sign a connection detail. // Also need to establish no man in the middle attack is in progress. @@ -202,7 +202,7 @@ void Peer::connected(const boost::system::error_code& error) mClientConnect = false; - if (iPort == SYSTEM_PEER_PORT) + if (iPort == SYSTEM_PEER_PORT) //TODO: Why are you doing this? iPort = -1; if (error) @@ -375,6 +375,22 @@ void Peer::processReadBuffer() else std::cerr << "parse error: " << type << std::endl; } break; + case newcoin::mtGET_PEERS: + { + newcoin::TMGetPeers msg; + if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + recvGetPeers(msg); + else std::cerr << "parse error: " << type << std::endl; + } + break; + case newcoin::mtPEERS: + { + newcoin::TMPeers msg; + if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + recvPeers(msg); + else std::cerr << "parse error: " << type << std::endl; + } + break; case newcoin::mtSEARCH_TRANSACTION: { @@ -537,7 +553,7 @@ void Peer::recvHello(newcoin::TMHello& packet) else { // At this point we could add the inbound connection to our IP list. However, the inbound IP address might be that of - // a NAT. It would be best to only add it if and only if we can immediatly verify it. + // a NAT. It would be best to only add it if and only if we can immediately verify it. nothing(); } @@ -564,6 +580,8 @@ void Peer::recvHello(newcoin::TMHello& packet) mNodePublic.clear(); detach("recvh"); } + + sendGetPeers(); } void Peer::recvTransaction(newcoin::TMTransaction& packet) @@ -665,6 +683,45 @@ void Peer::recvGetContacts(newcoin::TMGetContacts& packet) { } +// return a list of your favorite people +void Peer::recvGetPeers(newcoin::TMGetPeers& packet) +{ + std::vector addrs; + theApp->getConnectionPool().getTopNAddrs(30,addrs); + newcoin::TMPeers peers; + + for(int n=0; nset_ipv4(inet_addr(strIP.c_str())); + addr->set_ipv4port(port); + + std::cout << "Teaching about: " << strIP << std::endl; + } + + + PackedMessage::pointer message = boost::make_shared(peers, newcoin::mtPEERS); + sendPacket(message); +} +void Peer::recvPeers(newcoin::TMPeers& packet) +{ + for(int i = 0; i < packet.nodes().size(); ++i) + { + in_addr addr; + addr.s_addr=packet.nodes(i).ipv4(); + std::string strIP( inet_ntoa(addr)); + int port=packet.nodes(i).ipv4port(); + + std::cout << "Learning about: " << strIP << std::endl; + + theApp->getConnectionPool().savePeer(strIP,port); + } + +} + void Peer::recvIndexedObject(newcoin::TMIndexedObject& packet) { } @@ -958,6 +1015,16 @@ void Peer::sendHello() sendPacket(packet); } +void Peer::sendGetPeers() +{ + // get other peers this guy knows about + newcoin::TMGetPeers getPeers; + getPeers.set_doweneedthis(1); + PackedMessage::pointer packet = boost::make_shared(getPeers, newcoin::mtGET_PEERS); + sendPacket(packet); +} + + void Peer::punishPeer(PeerPunish) { } diff --git a/src/Peer.h b/src/Peer.h index 7e9601222..82eb997e7 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -56,6 +56,8 @@ protected: Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx); + + void handle_write(const boost::system::error_code& error, size_t bytes_transferred); //void handle_read(const boost::system::error_code& error, size_t bytes_transferred); void handle_read_header(const boost::system::error_code& error); @@ -74,6 +76,8 @@ protected: void recvGetValidation(newcoin::TMGetValidations& packet); void recvContact(newcoin::TMContact& packet); void recvGetContacts(newcoin::TMGetContacts& packet); + void recvGetPeers(newcoin::TMGetPeers& packet); + void recvPeers(newcoin::TMPeers& packet); void recvIndexedObject(newcoin::TMIndexedObject& packet); void recvGetObjectByHash(newcoin::TMGetObjectByHash& packet); void recvObjectByHash(newcoin::TMObjectByHash& packet); @@ -95,6 +99,9 @@ public: //bool operator == (const Peer& other); + std::string& getIP(){ return(mIpPort.first); } + int getPort(){ return(mIpPort.second); } + static pointer create(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) { return pointer(new Peer(io_service, ctx)); @@ -115,6 +122,7 @@ public: void sendLedgerProposal(Ledger::pointer ledger); void sendFullLedger(Ledger::pointer ledger); void sendGetFullLedger(uint256& hash); + void sendGetPeers(); void punishPeer(PeerPunish pp); diff --git a/src/UniqueNodeList.cpp b/src/UniqueNodeList.cpp index 9e6aff99f..72730a17d 100644 --- a/src/UniqueNodeList.cpp +++ b/src/UniqueNodeList.cpp @@ -186,7 +186,7 @@ void UniqueNodeList::scoreCompute() { if (db->getNull("PublicKey")) { - nothing(); // We ignore entrys we don't have public keys for. + nothing(); // We ignore entries we don't have public keys for. } else { @@ -401,7 +401,7 @@ void UniqueNodeList::scoreCompute() } } - // For each validator, get each referal and add its score to ip's score. + // For each validator, get each referral and add its score to ip's score. // map of pair :: score epScore umScore; @@ -1196,7 +1196,7 @@ void UniqueNodeList::setSeedDomains(const seedDomain& sdSource, bool bNext) // Add a trusted node. Called by RPC or other source. // XXX allow update of comment. -// XXX Broken should opperate on seeds. +// XXX Broken should operate on seeds. void UniqueNodeList::nodeAddPublic(const NewcoinAddress& naNodePublic, const std::string& strComment) { std::string strPublicKey = naNodePublic.humanNodePublic(); @@ -1216,7 +1216,7 @@ void UniqueNodeList::nodeAddPublic(const NewcoinAddress& naNodePublic, const std } } -// XXX Broken should opperate on seeds. +// XXX Broken should operate on seeds. void UniqueNodeList::nodeRemove(NewcoinAddress naNodePublic) { std::string strPublicKey = naNodePublic.humanNodePublic(); diff --git a/src/newcoin.proto b/src/newcoin.proto index 2e0fa5b67..38be08e20 100644 --- a/src/newcoin.proto +++ b/src/newcoin.proto @@ -9,6 +9,8 @@ enum MessageType { // network presence detection mtGET_CONTACTS= 10; mtCONTACT= 11; + mtGET_PEERS= 12; + mtPEERS= 13; // operations for 'small' nodes mtSEARCH_TRANSACTION= 20; @@ -150,6 +152,19 @@ message TMGetContacts { optional uint32 nodeCount =2; // get some random nodes } +message TMGetPeers { + required uint32 doWeNeedThis =1; // yes since you are asserting that the packet size isn't 0 in PackedMessage +} + +message TMIPv4EndPoint { + required uint32 ipv4 = 1; + required uint32 ipv4Port = 2; +} + +message TMPeers { + repeated TMIPv4EndPoint nodes =1; +} + message TMSearchTransaction { required uint32 maxTrans =1; diff --git a/src/utils.cpp b/src/utils.cpp index ffe440618..742c91cff 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -102,6 +102,23 @@ DH* DH_der_load_hex(const std::string& strDer) return DH_der_load(strBuf); } +/* +void intIPtoStr(int ip,std::string& retStr) +{ + unsigned char bytes[4]; + bytes[0] = ip & 0xFF; + bytes[1] = (ip >> 8) & 0xFF; + bytes[2] = (ip >> 16) & 0xFF; + bytes[3] = (ip >> 24) & 0xFF; + + retStr=str(boost::format("%d.%d.%d.%d") % bytes[3] % bytes[2] % bytes[1] % bytes[0] ); +} + +int strIPtoInt(std::string& ipStr) +{ + +} +*/ #ifdef WIN32 #define _WINSOCK_ #include diff --git a/src/utils.h b/src/utils.h index 721fbff87..b7e888d4b 100644 --- a/src/utils.h +++ b/src/utils.h @@ -28,6 +28,11 @@ boost::posix_time::ptime ptEpoch(); int iToSeconds(boost::posix_time::ptime ptWhen); boost::posix_time::ptime ptFromSeconds(int iSeconds); +/* +void intIPtoStr(int ip,std::string& retStr); +int strIPtoInt(std::string& ipStr); +*/ + template std::string strJoin(Iterator first, Iterator last, std::string strSeperator) {