Fix peer tracking and scaning model.

This commit is contained in:
Arthur Britto
2012-06-19 12:26:15 -07:00
parent a7f8cd1a45
commit ce16e58c62
5 changed files with 220 additions and 125 deletions

View File

@@ -49,13 +49,24 @@ void ConnectionPool::start()
bool ConnectionPool::getTopNAddrs(int n,std::vector<std::string>& addrs)
{
Database* db = theApp->getWalletDB()->getDB();
ScopedLock sl(theApp->getWalletDB()->getDBLock());
SQL_FOREACH(db, str(boost::format("SELECT IpPort FROM PeerIps limit %d") % n) )
// XXX Filter out other local addresses (like ipv6)
if (!theConfig.PEER_IP.empty() && theConfig.PEER_IP != "127.0.0.1")
{
std::string str;
db->getStr(0,str);
addrs.push_back(str);
addrs.push_back(str(boost::format("%s %d") % theConfig.PEER_IP % theConfig.PEER_PORT));
}
{
Database* db = theApp->getWalletDB()->getDB();
ScopedLock sl(theApp->getWalletDB()->getDBLock());
SQL_FOREACH(db, str(boost::format("SELECT IpPort FROM PeerIps LIMIT %d") % n) )
{
std::string str;
db->getStr(0,str);
addrs.push_back(str);
}
}
return true;
@@ -65,15 +76,15 @@ bool ConnectionPool::savePeer(const std::string& strIp, int iPort,char code)
{
Database* db = theApp->getWalletDB()->getDB();
std::string ipPort=db->escape(str(boost::format("%s %d") % strIp % iPort));
std::string ipPort= sqlEscape(str(boost::format("%s %d") % strIp % iPort));
ScopedLock sl(theApp->getWalletDB()->getDBLock());
std::string sql=str(boost::format("SELECT count(*) FROM PeerIps WHERE IpPort=%s;") % ipPort);
std::string sql=str(boost::format("SELECT COUNT(*) FROM PeerIps WHERE IpPort=%s;") % ipPort);
if (db->executeSQL(sql) && db->startIterRows())
{
if ( db->getInt(0)==0)
{
db->executeSQL(str(boost::format("INSERT INTO PeerIps (IpPort,Score,Source) values (%s,0,'%c');") % ipPort % code));
db->executeSQL(str(boost::format("INSERT INTO PeerIps (IpPort,Score,Source) values (%s,0,'%c');") % ipPort % code));
return true;
}// else we already had this peer
}
@@ -151,7 +162,7 @@ void ConnectionPool::policyLowWater()
else
{
// Try to start connection.
if (!connectTo(strIp, iPort))
if (!peerConnect(strIp, iPort))
throw std::runtime_error("Internal error: standby was already connected.");
// Check if we need more.
@@ -198,7 +209,6 @@ void ConnectionPool::policyHandler(const boost::system::error_code& ecResult)
}
}
// XXX Broken: also don't send a message to a peer if we got it from the peer.
void ConnectionPool::relayMessage(Peer* fromPeer, PackedMessage::pointer msg)
{
boost::mutex::scoped_lock sl(mPeerLock);
@@ -213,40 +223,27 @@ void ConnectionPool::relayMessage(Peer* fromPeer, PackedMessage::pointer msg)
}
}
// Inbound connection, false=reject
// Reject addresses we already have in our table.
// XXX Reject, if we have too many connections.
bool ConnectionPool::peerRegister(Peer::pointer peer, const std::string& strIp, int iPort)
// Add or modify into PeerIps as a manual entry for immediate scanning.
// Requires sane IP and port.
void ConnectionPool::connectTo(const std::string& strIp, int iPort)
{
bool bAccept;
ipPort ip = make_pair(strIp, iPort);
Database* db = theApp->getWalletDB()->getDB();
std::string ipPort = sqlEscape(str(boost::format("%s %d") % strIp % iPort));
boost::unordered_map<ipPort, Peer::pointer>::iterator it;
boost::mutex::scoped_lock sl(mPeerLock);
it = mIpMap.find(ip);
if (it == mIpMap.end())
{
// Did not find it. Not already connecting or connected.
ScopedLock sl(theApp->getWalletDB()->getDBLock());
std::cerr << "ConnectionPool::peerRegister: " << ip.first << " " << ip.second << std::endl;
// Mark as connecting.
mIpMap[ip] = peer;
bAccept = true;
}
else
{
// Found it. Already connected or connecting.
bAccept = false;
db->executeSQL(str(boost::format("REPLACE INTO PeerIps (IpPort,Score,Source,ScanNext) values (%s,%d,'%c',0);")
% ipPort
% theApp->getUNL().iSourceScore(UniqueNodeList::vsManual)
% char(UniqueNodeList::vsManual)));
}
return bAccept;
scanRefresh();
}
bool ConnectionPool::connectTo(const std::string& strIp, int iPort)
// <-- true, if already connected.
bool ConnectionPool::peerConnect(const std::string& strIp, int iPort)
{
bool bConnecting;
ipPort ip = make_pair(strIp, iPort);
@@ -260,7 +257,7 @@ bool ConnectionPool::connectTo(const std::string& strIp, int iPort)
if (it == mIpMap.end())
{
// Did not find it. Not already connecting or connected.
std::cerr << "ConnectionPool::connectTo: Connecting: "
std::cerr << "ConnectionPool::peerConnect: Connecting: "
<< strIp << " " << iPort << std::endl;
Peer::pointer peer(Peer::create(theApp->getIOService(), mCtx));
@@ -276,7 +273,7 @@ bool ConnectionPool::connectTo(const std::string& strIp, int iPort)
else
{
// Found it. Already connected.
std::cerr << "ConnectionPool::connectTo: Already connected: "
std::cerr << "ConnectionPool::peerConnect: Already connected: "
<< strIp << " " << iPort << std::endl;
bConnecting = false;
@@ -316,15 +313,15 @@ std::vector<Peer::pointer> ConnectionPool::getPeerVector()
}
// Now know peer's node public key. Determine if we want to stay connected.
bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& na)
bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& naPeer, const std::string& strIP, int iPort)
{
bool bSuccess;
std::cerr << "ConnectionPool::peerConnected: " << na.humanNodePublic()
<< " " << peer->getIP() << " " << peer->getPort()
<< std::endl;
std::cerr << "ConnectionPool::peerConnected: "
<< naPeer.humanNodePublic() << " " << strIP << " " << iPort << std::endl;
assert(!!peer);
if (na == theApp->getWallet().getNodePublic())
if (naPeer == theApp->getWallet().getNodePublic())
{
std::cerr << "ConnectionPool::peerConnected: To self." << std::endl;
bSuccess = false;
@@ -332,25 +329,42 @@ bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& na)
else
{
boost::mutex::scoped_lock sl(mPeerLock);
boost::unordered_map<NewcoinAddress, Peer::pointer>::iterator itCm = mConnectedMap.find(naPeer);
mConnectedMap[na] = peer;
bSuccess = true;
if (itCm == mConnectedMap.end())
{
// New connection.
mConnectedMap[naPeer] = peer;
bSuccess = true;
}
else
{
// Found in map, already connected.
if (!strIP.empty())
{
// Was an outbound connection, we know IP and port.
// Note in previous connection how to reconnect.
itCm->second->peerIpPort(strIP, iPort);
}
bSuccess = false; // Don't need a redundant connection.
}
}
return bSuccess;
}
void ConnectionPool::peerDisconnected(Peer::pointer peer, const ipPort& ipPeer, const NewcoinAddress& naPeer)
// We maintain a map of public key to peer for connectted and verified peers. Maintain it.
void ConnectionPool::peerDisconnected(Peer::pointer peer, const NewcoinAddress& naPeer)
{
std::cerr << "ConnectionPool::peerDisconnected: " << ipPeer.first << " " << ipPeer.second << std::endl;
boost::mutex::scoped_lock sl(mPeerLock);
std::cerr << "ConnectionPool::peerDisconnected: " << peer->getIP() << " " << peer->getPort() << std::endl;
if (naPeer.isValid())
{
boost::unordered_map<NewcoinAddress, Peer::pointer>::iterator itCm;
boost::mutex::scoped_lock sl(mPeerLock);
itCm = mConnectedMap.find(naPeer);
if (itCm == mConnectedMap.end())
@@ -365,27 +379,53 @@ void ConnectionPool::peerDisconnected(Peer::pointer peer, const ipPort& ipPeer,
mConnectedMap.erase(itCm);
}
}
}
boost::unordered_map<ipPort, Peer::pointer>::iterator itIp;
void ConnectionPool::peerScanSet(const std::string& strIp, int iPort)
{
std::cerr << "ConnectionPool::peerScanSet: " << strIp << " " << iPort << std::endl;
itIp = mIpMap.find(ipPeer);
std::string strIpPort = str(boost::format("%s %d") % strIp % iPort);
if (itIp == mIpMap.end())
ScopedLock sl(theApp->getWalletDB()->getDBLock());
Database* db = theApp->getWalletDB()->getDB();
if (db->executeSQL(str(boost::format("SELECT ScanNext FROM PeerIps WHERE IpPort=%s;")
% sqlEscape(strIpPort)))
&& db->startIterRows())
{
// Did not find it. Not already connecting or connected.
std::cerr << "Internal Error: peer wasn't connected: "
<< ipPeer.first << " " << ipPeer.second << std::endl;
// XXX Bad error.
if (db->getNull("ScanNext"))
{
// Non-scanning connection terminated. Schedule for scanning.
int iInterval = theConfig.PEER_SCAN_INTERVAL_MIN;
boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time();
boost::posix_time::ptime tpNext = tpNow + boost::posix_time::seconds(iInterval);
std::cerr << str(boost::format("peerScanSet: scan schedule: %s %s (next %s, delay=%s)")
% mScanIp % mScanPort % tpNext % iInterval) << std::endl;
db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IpPort=%s;")
% iToSeconds(tpNext)
% iInterval
% db->escape(strIpPort)));
}
else
{
// Scanning connection terminate, already scheduled for retry.
nothing();
}
}
else
{
// Found it. Delete it.
mIpMap.erase(itIp);
std::cerr << "peerScanSet: peer wasn't in PeerIps: " << strIp << " " << iPort << std::endl;
}
}
void ConnectionPool::peerFailed(const std::string& strIp, int iPort)
{
std::cerr << "ConnectionPool::peerFailed: " << strIp << " " << iPort << std::endl;
ipPort ipPeer = make_pair(strIp, iPort);
// If the fail was our scan, we are no longer scanning.
if (bScanning && !mScanIp.compare(strIp) && mScanPort == iPort)
{
@@ -394,6 +434,36 @@ void ConnectionPool::peerFailed(const std::string& strIp, int iPort)
// Look for more to scan.
scanRefresh();
}
bool bScanSet = false;
{
boost::mutex::scoped_lock sl(mPeerLock);
boost::unordered_map<ipPort, Peer::pointer>::iterator itIp;
itIp = mIpMap.find(ipPeer);
if (itIp == mIpMap.end())
{
// Did not find it. Not already connecting or connected.
std::cerr << "Internal Error: peer wasn't connected: "
<< ipPeer.first << " " << ipPeer.second << std::endl;
// XXX Bad error.
}
else
{
// Found it. Delete it.
mIpMap.erase(itIp);
bScanSet = true;
}
}
if (bScanSet)
{
// Schedule for scanning.
peerScanSet(ipPeer.first, ipPeer.second);
}
}
void ConnectionPool::peerVerified(const std::string& strIp, int iPort)
@@ -401,6 +471,9 @@ void ConnectionPool::peerVerified(const std::string& strIp, int iPort)
if (bScanning && !mScanIp.compare(strIp), mScanPort == iPort)
{
std::string strIpPort = str(boost::format("%s %d") % strIp % iPort);
std::cerr << str(boost::format("peerVerified: %s %s (scan off)") % mScanIp % mScanPort) << std::endl;
// Scan completed successfully.
{
ScopedLock sl(theApp->getWalletDB()->getDBLock());
@@ -486,7 +559,6 @@ void ConnectionPool::scanRefresh()
(void) mScanTimer.cancel();
std::cerr << "scanRefresh: scanning: " << mScanIp << " " << mScanPort << std::endl;
bScanning = true;
iInterval *= 2;
@@ -494,6 +566,9 @@ void ConnectionPool::scanRefresh()
tpNext = tpNow + boost::posix_time::seconds(iInterval);
std::cerr << str(boost::format("scanRefresh: now scanning: %s %s (next %s, delay=%s)")
% mScanIp % mScanPort % tpNext % iInterval) << std::endl;
{
ScopedLock sl(theApp->getWalletDB()->getDBLock());
Database *db=theApp->getWalletDB()->getDB();
@@ -505,7 +580,7 @@ void ConnectionPool::scanRefresh()
// XXX Check error.
}
if (!connectTo(mScanIp, mScanPort))
if (!peerConnect(mScanIp, mScanPort))
{
// Already connected. Try again.
scanRefresh();