maintain db of peers

This commit is contained in:
jed
2012-06-13 08:20:08 -07:00
parent 253df289c3
commit f7b5753be1
9 changed files with 167 additions and 10 deletions

View File

@@ -13,7 +13,7 @@
#include "key.h"
#include "utils.h"
#include "TaggedCache.h"
#include "boost/filesystem.hpp"
Application* theApp = NULL;
@@ -21,7 +21,7 @@ DatabaseCon::DatabaseCon(const std::string& strName, const char *initStrings[],
{
boost::filesystem::path pPath = theConfig.DATA_DIR / strName;
mDatabase = new SqliteDatabase(pPath.c_str());
mDatabase = new SqliteDatabase((const char*) pPath.c_str());
mDatabase->connect();
for(int i = 0; i < initCount; ++i)
mDatabase->executeSQL(initStrings[i], true);

View File

@@ -11,7 +11,11 @@
#include <boost/format.hpp>
#include <boost/algorithm/string.hpp>
static void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort)
#define SQL_FOREACH(_db, _strQuery) \
if ((_db)->executeSQL(_strQuery)) \
for (bool _bMore = (db)->startIterRows(); _bMore; _bMore = (_db)->getNextRow())
void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort)
{
std::vector<std::string> vIpPort;
boost::split(vIpPort, strIpPort, boost::is_any_of(" "));
@@ -43,6 +47,40 @@ void ConnectionPool::start()
scanRefresh();
}
bool ConnectionPool::getTopNAddrs(int n,std::vector<std::string>& addrs)
{
Database* db = theApp->getWalletDB()->getDB();
ScopedLock sl(theApp->getWalletDB()->getDBLock());
SQL_FOREACH(db, str(boost::format("SELECT IpPort FROM PeerIps limit %d") % n) )
{
std::string str;
db->getStr(0,str);
addrs.push_back(str);
}
return true;
}
bool ConnectionPool::savePeer(const std::string& strIp, int iPort)
{
Database* db = theApp->getWalletDB()->getDB();
std::string ipPort=db->escape(str(boost::format("%s %d") % strIp % iPort));
ScopedLock sl(theApp->getWalletDB()->getDBLock());
std::string sql=str(boost::format("SELECT count(*) FROM PeerIps WHERE IpPort=%s;") % ipPort);
if(db->executeSQL(sql) && db->startIterRows())
{
if( db->getInt(0)==0)
{
db->executeSQL(str(boost::format("INSERT INTO PeerIps (IpPort,Score,Source) values (%s,0,'a');") % ipPort));
return true;
}// else we already had this peer
}else std::cout << "Error saving Peer" << std::endl;
return false;
}
bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort)
{
Database* db = theApp->getWalletDB()->getDB();
@@ -247,8 +285,11 @@ bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& na)
{
mConnectedMap[na] = peer;
bSuccess = true;
savePeer(peer->getIP(),peer->getPort());
}
return bSuccess;
}

View File

@@ -55,6 +55,8 @@ public:
//
// Peer connectivity notification.
//
bool getTopNAddrs(int n,std::vector<std::string>& addrs);
bool savePeer(const std::string& strIp, int iPort);
// Inbound connection, false=reject
bool peerRegister(Peer::pointer peer, const std::string& strIp, int iPort);
@@ -93,5 +95,7 @@ public:
#endif
};
extern void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort);
#endif
// vim:ts=4

View File

@@ -155,7 +155,7 @@ void Peer::connect(const std::string strIp, int iPort)
}
}
// We have an ecrypted connection to the peer.
// We have an encrypted connection to the peer.
// Have it say who it is so we know to avoid redundant connections.
// Establish that it really who we are talking to by having it sign a connection detail.
// Also need to establish no man in the middle attack is in progress.
@@ -202,7 +202,7 @@ void Peer::connected(const boost::system::error_code& error)
mClientConnect = false;
if (iPort == SYSTEM_PEER_PORT)
if (iPort == SYSTEM_PEER_PORT) //TODO: Why are you doing this?
iPort = -1;
if (error)
@@ -375,6 +375,22 @@ void Peer::processReadBuffer()
else std::cerr << "parse error: " << type << std::endl;
}
break;
case newcoin::mtGET_PEERS:
{
newcoin::TMGetPeers msg;
if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recvGetPeers(msg);
else std::cerr << "parse error: " << type << std::endl;
}
break;
case newcoin::mtPEERS:
{
newcoin::TMPeers msg;
if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
recvPeers(msg);
else std::cerr << "parse error: " << type << std::endl;
}
break;
case newcoin::mtSEARCH_TRANSACTION:
{
@@ -537,7 +553,7 @@ void Peer::recvHello(newcoin::TMHello& packet)
else
{
// At this point we could add the inbound connection to our IP list. However, the inbound IP address might be that of
// a NAT. It would be best to only add it if and only if we can immediatly verify it.
// a NAT. It would be best to only add it if and only if we can immediately verify it.
nothing();
}
@@ -564,6 +580,8 @@ void Peer::recvHello(newcoin::TMHello& packet)
mNodePublic.clear();
detach("recvh");
}
sendGetPeers();
}
void Peer::recvTransaction(newcoin::TMTransaction& packet)
@@ -665,6 +683,45 @@ void Peer::recvGetContacts(newcoin::TMGetContacts& packet)
{
}
// return a list of your favorite people
void Peer::recvGetPeers(newcoin::TMGetPeers& packet)
{
std::vector<std::string> addrs;
theApp->getConnectionPool().getTopNAddrs(30,addrs);
newcoin::TMPeers peers;
for(int n=0; n<addrs.size(); n++)
{
std::string strIP;
int port;
splitIpPort(addrs[n],strIP,port);
newcoin::TMIPv4EndPoint* addr=peers.add_nodes();
addr->set_ipv4(inet_addr(strIP.c_str()));
addr->set_ipv4port(port);
std::cout << "Teaching about: " << strIP << std::endl;
}
PackedMessage::pointer message = boost::make_shared<PackedMessage>(peers, newcoin::mtPEERS);
sendPacket(message);
}
void Peer::recvPeers(newcoin::TMPeers& packet)
{
for(int i = 0; i < packet.nodes().size(); ++i)
{
in_addr addr;
addr.s_addr=packet.nodes(i).ipv4();
std::string strIP( inet_ntoa(addr));
int port=packet.nodes(i).ipv4port();
std::cout << "Learning about: " << strIP << std::endl;
theApp->getConnectionPool().savePeer(strIP,port);
}
}
void Peer::recvIndexedObject(newcoin::TMIndexedObject& packet)
{
}
@@ -958,6 +1015,16 @@ void Peer::sendHello()
sendPacket(packet);
}
void Peer::sendGetPeers()
{
// get other peers this guy knows about
newcoin::TMGetPeers getPeers;
getPeers.set_doweneedthis(1);
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(getPeers, newcoin::mtGET_PEERS);
sendPacket(packet);
}
void Peer::punishPeer(PeerPunish)
{
}

View File

@@ -56,6 +56,8 @@ protected:
Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx);
void handle_write(const boost::system::error_code& error, size_t bytes_transferred);
//void handle_read(const boost::system::error_code& error, size_t bytes_transferred);
void handle_read_header(const boost::system::error_code& error);
@@ -74,6 +76,8 @@ protected:
void recvGetValidation(newcoin::TMGetValidations& packet);
void recvContact(newcoin::TMContact& packet);
void recvGetContacts(newcoin::TMGetContacts& packet);
void recvGetPeers(newcoin::TMGetPeers& packet);
void recvPeers(newcoin::TMPeers& packet);
void recvIndexedObject(newcoin::TMIndexedObject& packet);
void recvGetObjectByHash(newcoin::TMGetObjectByHash& packet);
void recvObjectByHash(newcoin::TMObjectByHash& packet);
@@ -95,6 +99,9 @@ public:
//bool operator == (const Peer& other);
std::string& getIP(){ return(mIpPort.first); }
int getPort(){ return(mIpPort.second); }
static pointer create(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx)
{
return pointer(new Peer(io_service, ctx));
@@ -115,6 +122,7 @@ public:
void sendLedgerProposal(Ledger::pointer ledger);
void sendFullLedger(Ledger::pointer ledger);
void sendGetFullLedger(uint256& hash);
void sendGetPeers();
void punishPeer(PeerPunish pp);

View File

@@ -186,7 +186,7 @@ void UniqueNodeList::scoreCompute()
{
if (db->getNull("PublicKey"))
{
nothing(); // We ignore entrys we don't have public keys for.
nothing(); // We ignore entries we don't have public keys for.
}
else
{
@@ -401,7 +401,7 @@ void UniqueNodeList::scoreCompute()
}
}
// For each validator, get each referal and add its score to ip's score.
// For each validator, get each referral and add its score to ip's score.
// map of pair<IP,Port> :: score
epScore umScore;
@@ -1196,7 +1196,7 @@ void UniqueNodeList::setSeedDomains(const seedDomain& sdSource, bool bNext)
// Add a trusted node. Called by RPC or other source.
// XXX allow update of comment.
// XXX Broken should opperate on seeds.
// XXX Broken should operate on seeds.
void UniqueNodeList::nodeAddPublic(const NewcoinAddress& naNodePublic, const std::string& strComment)
{
std::string strPublicKey = naNodePublic.humanNodePublic();
@@ -1216,7 +1216,7 @@ void UniqueNodeList::nodeAddPublic(const NewcoinAddress& naNodePublic, const std
}
}
// XXX Broken should opperate on seeds.
// XXX Broken should operate on seeds.
void UniqueNodeList::nodeRemove(NewcoinAddress naNodePublic)
{
std::string strPublicKey = naNodePublic.humanNodePublic();

View File

@@ -9,6 +9,8 @@ enum MessageType {
// network presence detection
mtGET_CONTACTS= 10;
mtCONTACT= 11;
mtGET_PEERS= 12;
mtPEERS= 13;
// operations for 'small' nodes
mtSEARCH_TRANSACTION= 20;
@@ -150,6 +152,19 @@ message TMGetContacts {
optional uint32 nodeCount =2; // get some random nodes
}
message TMGetPeers {
required uint32 doWeNeedThis =1; // yes since you are asserting that the packet size isn't 0 in PackedMessage
}
message TMIPv4EndPoint {
required uint32 ipv4 = 1;
required uint32 ipv4Port = 2;
}
message TMPeers {
repeated TMIPv4EndPoint nodes =1;
}
message TMSearchTransaction {
required uint32 maxTrans =1;

View File

@@ -102,6 +102,23 @@ DH* DH_der_load_hex(const std::string& strDer)
return DH_der_load(strBuf);
}
/*
void intIPtoStr(int ip,std::string& retStr)
{
unsigned char bytes[4];
bytes[0] = ip & 0xFF;
bytes[1] = (ip >> 8) & 0xFF;
bytes[2] = (ip >> 16) & 0xFF;
bytes[3] = (ip >> 24) & 0xFF;
retStr=str(boost::format("%d.%d.%d.%d") % bytes[3] % bytes[2] % bytes[1] % bytes[0] );
}
int strIPtoInt(std::string& ipStr)
{
}
*/
#ifdef WIN32
#define _WINSOCK_
#include <winsock2.h>

View File

@@ -28,6 +28,11 @@ boost::posix_time::ptime ptEpoch();
int iToSeconds(boost::posix_time::ptime ptWhen);
boost::posix_time::ptime ptFromSeconds(int iSeconds);
/*
void intIPtoStr(int ip,std::string& retStr);
int strIPtoInt(std::string& ipStr);
*/
template<class Iterator>
std::string strJoin(Iterator first, Iterator last, std::string strSeperator)
{