Fixes and clean up for peer management.

This commit is contained in:
Arthur Britto
2012-06-20 13:40:03 -07:00
parent 71e6921a61
commit a2c5b90fe9
4 changed files with 278 additions and 196 deletions

View File

@@ -28,7 +28,6 @@ void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort)
ConnectionPool::ConnectionPool(boost::asio::io_service& io_service) :
mCtx(boost::asio::ssl::context::sslv23),
bScanning(false),
mScanTimer(io_service),
mPolicyTimer(io_service)
{
@@ -53,36 +52,29 @@ void ConnectionPool::start()
bool ConnectionPool::getTopNAddrs(int n,std::vector<std::string>& addrs)
{
// XXX Filter out other local addresses (like ipv6)
if (!theConfig.PEER_IP.empty() && theConfig.PEER_IP != "127.0.0.1")
Database* db = theApp->getWalletDB()->getDB();
ScopedLock sl(theApp->getWalletDB()->getDBLock());
SQL_FOREACH(db, str(boost::format("SELECT IpPort FROM PeerIps LIMIT %d") % n) )
{
addrs.push_back(str(boost::format("%s %d") % theConfig.PEER_IP % theConfig.PEER_PORT));
}
std::string str;
{
Database* db = theApp->getWalletDB()->getDB();
ScopedLock sl(theApp->getWalletDB()->getDBLock());
db->getStr(0,str);
SQL_FOREACH(db, str(boost::format("SELECT IpPort FROM PeerIps LIMIT %d") % n) )
{
std::string str;
db->getStr(0,str);
addrs.push_back(str);
}
addrs.push_back(str);
}
return true;
}
bool ConnectionPool::savePeer(const std::string& strIp, int iPort,char code)
bool ConnectionPool::savePeer(const std::string& strIp, int iPort, char code)
{
Database* db = theApp->getWalletDB()->getDB();
std::string ipPort= sqlEscape(str(boost::format("%s %d") % strIp % iPort));
std::string ipPort = sqlEscape(str(boost::format("%s %d") % strIp % iPort));
ScopedLock sl(theApp->getWalletDB()->getDBLock());
std::string sql=str(boost::format("SELECT COUNT(*) FROM PeerIps WHERE IpPort=%s;") % ipPort);
std::string sql = str(boost::format("SELECT COUNT(*) FROM PeerIps WHERE IpPort=%s;") % ipPort);
if (db->executeSQL(sql) && db->startIterRows())
{
if ( db->getInt(0)==0)
@@ -99,6 +91,9 @@ bool ConnectionPool::savePeer(const std::string& strIp, int iPort,char code)
return false;
}
// An available peer is one we had no trouble connect to last time and that we are not currently knowingly connected or connecting
// too.
//
// <-- true, if a peer is available to connect to
bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort)
{
@@ -108,6 +103,7 @@ bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort)
// Convert mIpMap (list of open connections) to a vector of "<ip> <port>".
{
boost::mutex::scoped_lock sl(mPeerLock);
vstrIpPort.reserve(mIpMap.size());
BOOST_FOREACH(pipPeer ipPeer, mIpMap)
@@ -115,19 +111,22 @@ bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort)
const std::string& strIp = ipPeer.first.first;
int iPort = ipPeer.first.second;
vstrIpPort.push_back(db->escape(str(boost::format("%s %d") % strIp % iPort)));
vstrIpPort.push_back(sqlEscape(str(boost::format("%s %d") % strIp % iPort)));
}
}
// Get the first IpPort entry which is not in vector and which is not scheduled for scanning.
std::string strIpPort;
ScopedLock sl(theApp->getWalletDB()->getDBLock());
if (db->executeSQL(str(boost::format("SELECT IpPort FROM PeerIps WHERE ScanNext IS NULL AND IpPort NOT IN (%s) LIMIT 1;")
% strJoin(vstrIpPort.begin(), vstrIpPort.end(), ",")))
&& db->startIterRows())
{
db->getStr("IpPort", strIpPort);
ScopedLock sl(theApp->getWalletDB()->getDBLock());
if (db->executeSQL(str(boost::format("SELECT IpPort FROM PeerIps WHERE ScanNext IS NULL AND IpPort NOT IN (%s) LIMIT 1;")
% strJoin(vstrIpPort.begin(), vstrIpPort.end(), ",")))
&& db->startIterRows())
{
db->getStr("IpPort", strIpPort);
}
}
bool bAvailable = !strIpPort.empty();
@@ -138,6 +137,7 @@ bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort)
return bAvailable;
}
// Make sure we have at least low water connections.
void ConnectionPool::policyLowWater()
{
std::string strIp;
@@ -166,7 +166,7 @@ void ConnectionPool::policyLowWater()
{
// Try to start connection.
if (!peerConnect(strIp, iPort))
throw std::runtime_error("Internal error: standby was already connected.");
Log(lsINFO) << "policyLowWater was already connected.";
// Check if we need more.
policyLowWater();
@@ -175,24 +175,14 @@ void ConnectionPool::policyLowWater()
void ConnectionPool::policyEnforce()
{
boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time();
Log(lsTRACE) << "policyEnforce: begin: " << tpNow;
// Cancel any in progrss timer.
// Cancel any in progress timer.
(void) mPolicyTimer.cancel();
// Enforce policies.
policyLowWater();
// Schedule next enforcement.
boost::posix_time::ptime tpNext;
tpNext = boost::posix_time::second_clock::universal_time()+boost::posix_time::seconds(POLICY_INTERVAL_SECONDS);
Log(lsTRACE) << "policyEnforce: schedule : " << tpNext;
mPolicyTimer.expires_at(tpNext);
mPolicyTimer.expires_at(boost::posix_time::second_clock::universal_time()+boost::posix_time::seconds(POLICY_INTERVAL_SECONDS));
mPolicyTimer.async_wait(boost::bind(&ConnectionPool::policyHandler, this, _1));
}
@@ -212,6 +202,8 @@ void ConnectionPool::policyHandler(const boost::system::error_code& ecResult)
}
}
// YYY: Should probably do this in the background.
// YYY: Might end up sending to disconnected peer?
void ConnectionPool::relayMessage(Peer* fromPeer, PackedMessage::pointer msg)
{
boost::mutex::scoped_lock sl(mPeerLock);
@@ -226,18 +218,18 @@ void ConnectionPool::relayMessage(Peer* fromPeer, PackedMessage::pointer msg)
}
}
// Schedule a connection via scanning.
//
// Add or modify into PeerIps as a manual entry for immediate scanning.
// Requires sane IP and port.
void ConnectionPool::connectTo(const std::string& strIp, int iPort)
{
Database* db = theApp->getWalletDB()->getDB();
std::string ipPort = sqlEscape(str(boost::format("%s %d") % strIp % iPort));
{
Database* db = theApp->getWalletDB()->getDB();
ScopedLock sl(theApp->getWalletDB()->getDBLock());
db->executeSQL(str(boost::format("REPLACE INTO PeerIps (IpPort,Score,Source,ScanNext) values (%s,%d,'%c',0);")
% ipPort
% sqlEscape(str(boost::format("%s %d") % strIp % iPort))
% theApp->getUNL().iSourceScore(UniqueNodeList::vsManual)
% char(UniqueNodeList::vsManual)));
}
@@ -245,46 +237,52 @@ void ConnectionPool::connectTo(const std::string& strIp, int iPort)
scanRefresh();
}
// Start a connection, if not already known connected or connecting.
//
// <-- true, if already connected.
bool ConnectionPool::peerConnect(const std::string& strIp, int iPort)
Peer::pointer ConnectionPool::peerConnect(const std::string& strIp, int iPort)
{
bool bConnecting;
ipPort ip = make_pair(strIp, iPort);
ipPort pipPeer = make_pair(strIp, iPort);
Peer::pointer ppResult = Peer::pointer();
boost::unordered_map<ipPort, Peer::pointer>::iterator it;
boost::mutex::scoped_lock sl(mPeerLock);
it = mIpMap.find(ip);
if (it == mIpMap.end())
{
// Did not find it. Not already connecting or connected.
std::cerr << "ConnectionPool::peerConnect: Connecting: "
<< strIp << " " << iPort << std::endl;
boost::mutex::scoped_lock sl(mPeerLock);
Peer::pointer peer(Peer::create(theApp->getIOService(), mCtx));
if ((it = mIpMap.find(pipPeer)) == mIpMap.end())
{
Peer::pointer ppNew(Peer::create(theApp->getIOService(), mCtx));
mIpMap[ip] = peer;
// Did not find it. Not already connecting or connected.
ppNew->connect(strIp, iPort);
peer->connect(strIp, iPort);
mIpMap[pipPeer] = ppNew;
// ++miConnectStarting;
ppResult = ppNew;
// ++miConnectStarting;
}
else
{
// Found it. Already connected.
bConnecting = true;
nothing();
}
}
if (ppResult)
{
Log(lsINFO) << "Pool: Connecting: " << ADDRESS_SHARED(ppResult) << ": " << strIp << " " << iPort;
}
else
{
// Found it. Already connected.
std::cerr << "ConnectionPool::peerConnect: Already connected: "
<< strIp << " " << iPort << std::endl;
bConnecting = false;
Log(lsINFO) << "Pool: Already connected: " << strIp << " " << iPort;
}
return bConnecting;
return ppResult;
}
// Returns information on verified peers.
Json::Value ConnectionPool::getPeersJson()
{
Json::Value ret(Json::arrayValue);
@@ -316,18 +314,16 @@ std::vector<Peer::pointer> ConnectionPool::getPeerVector()
}
// Now know peer's node public key. Determine if we want to stay connected.
// <-- bNew: false = redundant
bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& naPeer, const std::string& strIP, int iPort)
{
bool bSuccess;
bool bNew = false;
std::cerr << "ConnectionPool::peerConnected: "
<< naPeer.humanNodePublic() << " " << strIP << " " << iPort << std::endl;
assert(!!peer);
if (naPeer == theApp->getWallet().getNodePublic())
{
std::cerr << "ConnectionPool::peerConnected: To self." << std::endl;
bSuccess = false;
Log(lsINFO) << "Pool: Connected: self: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort;
}
else
{
@@ -337,31 +333,48 @@ bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& naP
if (itCm == mConnectedMap.end())
{
// New connection.
Log(lsINFO) << "Pool: Connected: new: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort;
mConnectedMap[naPeer] = peer;
bSuccess = true;
bNew = true;
}
// Found in map, already connected.
else if (!strIP.empty())
{
// Was an outbound connection, we know IP and port.
// Note in previous connection how to reconnect.
if (itCm->second->getIP().empty())
{
// Old peer did not know it's IP.
Log(lsINFO) << "Pool: Connected: redundant: outbound: " << ADDRESS_SHARED(peer) << " discovered: " << ADDRESS_SHARED(itCm->second) << ": " << strIP << " " << iPort;
itCm->second->setIpPort(strIP, iPort);
// Add old connection to identified connection list.
mIpMap[make_pair(strIP, iPort)] = itCm->second;
}
else
{
// Old peer knew its IP. Do nothing.
Log(lsINFO) << "Pool: Connected: redundant: outbound: rediscovered: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort;
nothing();
}
}
else
{
// Found in map, already connected.
if (!strIP.empty())
{
// Was an outbound connection, we know IP and port.
// Note in previous connection how to reconnect.
itCm->second->peerIpPort(strIP, iPort);
}
Log(lsINFO) << "Pool: Connected: redundant: inbound: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort;
bSuccess = false; // Don't need a redundant connection.
nothing();
}
}
return bSuccess;
return bNew;
}
// We maintain a map of public key to peer for connectted and verified peers. Maintain it.
// We maintain a map of public key to peer for connected and verified peers. Maintain it.
void ConnectionPool::peerDisconnected(Peer::pointer peer, const NewcoinAddress& naPeer)
{
std::cerr << "ConnectionPool::peerDisconnected: " << peer->getIP() << " " << peer->getPort() << std::endl;
if (naPeer.isValid())
{
boost::unordered_map<NewcoinAddress, Peer::pointer>::iterator itCm;
@@ -373,22 +386,36 @@ void ConnectionPool::peerDisconnected(Peer::pointer peer, const NewcoinAddress&
if (itCm == mConnectedMap.end())
{
// Did not find it. Not already connecting or connected.
std::cerr << "Internal Error: peer connection was inconsistent." << std::endl;
// XXX Bad error.
Log(lsWARNING) << "Pool: disconnected: Internal Error: mConnectedMap was inconsistent.";
// XXX Maybe bad error, considering we have racing connections, may not so bad.
}
else if (itCm->second != peer)
{
Log(lsWARNING) << "Pool: disconected: non canonical entry";
nothing();
}
else
{
// Found it. Delete it.
mConnectedMap.erase(itCm);
Log(lsINFO) << "Pool: disconnected: " << naPeer.humanNodePublic() << " " << peer->getIP() << " " << peer->getPort();
}
}
else
{
Log(lsINFO) << "Pool: disconnected: anonymous: " << peer->getIP() << " " << peer->getPort();
}
}
void ConnectionPool::peerScanSet(const std::string& strIp, int iPort)
// Schedule for immediate scanning, if not already scheduled.
//
// <-- true, scanRefresh needed.
bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort)
{
std::cerr << "ConnectionPool::peerScanSet: " << strIp << " " << iPort << std::endl;
std::string strIpPort = str(boost::format("%s %d") % strIp % iPort);
bool bScanDirty = false;
ScopedLock sl(theApp->getWalletDB()->getDBLock());
Database* db = theApp->getWalletDB()->getDB();
@@ -404,38 +431,48 @@ void ConnectionPool::peerScanSet(const std::string& strIp, int iPort)
boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time();
boost::posix_time::ptime tpNext = tpNow + boost::posix_time::seconds(iInterval);
std::cerr << str(boost::format("peerScanSet: scan schedule: %s %s (next %s, delay=%s)")
% mScanIp % mScanPort % tpNext % iInterval) << std::endl;
Log(lsINFO) << str(boost::format("Scanning: schedule create: %s %s (next %s, delay=%s)")
% mScanIp % mScanPort % tpNext % iInterval);
db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IpPort=%s;")
% iToSeconds(tpNext)
% iInterval
% db->escape(strIpPort)));
bScanDirty = true;
}
else
{
// Scanning connection terminate, already scheduled for retry.
nothing();
// Scanning connection terminated, already scheduled for retry.
boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time();
boost::posix_time::ptime tpNext = ptFromSeconds(db->getInt("ScanNext"));
int iInterval = (tpNext-tpNow).seconds();
Log(lsINFO) << str(boost::format("Scanning: schedule exists: %s %s (next %s, delay=%s)")
% mScanIp % mScanPort % tpNext % iInterval);
}
}
else
{
std::cerr << "peerScanSet: peer wasn't in PeerIps: " << strIp << " " << iPort << std::endl;
Log(lsWARNING) << "Scanning: peer wasn't in PeerIps: " << strIp << " " << iPort;
}
return bScanDirty;
}
void ConnectionPool::peerFailed(const std::string& strIp, int iPort)
// --> strIp: not empty
void ConnectionPool::peerClosed(Peer::pointer peer, const std::string& strIp, int iPort)
{
std::cerr << "ConnectionPool::peerFailed: " << strIp << " " << iPort << std::endl;
ipPort ipPeer = make_pair(strIp, iPort);
ipPort ipPeer = make_pair(strIp, iPort);
bool bScanRefresh = false;
// If the fail was our scan, we are no longer scanning.
if (bScanning && !mScanIp.compare(strIp) && mScanPort == iPort)
// If the connecttion was our scan, we are no longer scanning.
if (mScanning && mScanning == peer)
{
bScanning = false;
Log(lsINFO) << "Scanning: scan fail: " << strIp << " " << iPort;
// Look for more to scan.
scanRefresh();
mScanning = Peer::pointer(); // No longer scanning.
bScanRefresh = true; // Look for more to scan.
}
bool bScanSet = false;
@@ -449,33 +486,47 @@ void ConnectionPool::peerFailed(const std::string& strIp, int iPort)
if (itIp == mIpMap.end())
{
// Did not find it. Not already connecting or connected.
std::cerr << "Internal Error: peer wasn't connected: "
<< ipPeer.first << " " << ipPeer.second << std::endl;
// XXX Bad error.
Log(lsWARNING) << "Pool: Disconnect: UNEXPECTED: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
// XXX Internal error.
}
else if (mIpMap[ipPeer] == peer)
{
// We were the identified connection.
Log(lsINFO) << "Pool: Disconnect: identified: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
// Delete our entry.
mIpMap.erase(itIp);
// We want to connect again.
bScanSet = true;
}
else
{
// Found it. Delete it.
mIpMap.erase(itIp);
bScanSet = true;
// Found it. But, we were redundent.
Log(lsINFO) << "Pool: Disconnect: redundant: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
}
}
if (bScanSet)
{
// Schedule for scanning.
peerScanSet(ipPeer.first, ipPeer.second);
// Since we disconnnected, try to schedule for scanning again.
bScanRefresh = peerScanSet(ipPeer.first, ipPeer.second);
}
if (bScanRefresh)
scanRefresh();
}
void ConnectionPool::peerVerified(const std::string& strIp, int iPort)
void ConnectionPool::peerVerified(Peer::pointer peer)
{
if (bScanning && !mScanIp.compare(strIp), mScanPort == iPort)
if (mScanning && mScanning == peer)
{
std::string strIp = peer->getIP();
int iPort = peer->getPort();
std::string strIpPort = str(boost::format("%s %d") % strIp % iPort);
std::cerr << str(boost::format("peerVerified: %s %s (scan off)") % mScanIp % mScanPort) << std::endl;
Log(lsINFO) << str(boost::format("Scanning: connected: %s %s (scan off)") % strIp % iPort);
// Scan completed successfully.
{
@@ -487,8 +538,9 @@ void ConnectionPool::peerVerified(const std::string& strIp, int iPort)
// XXX Check error.
}
bScanning = false;
scanRefresh();
mScanning = Peer::pointer();
scanRefresh(); // Continue scanning.
}
}
@@ -511,10 +563,10 @@ void ConnectionPool::scanHandler(const boost::system::error_code& ecResult)
// Scan ips as per db entries.
void ConnectionPool::scanRefresh()
{
if (bScanning)
if (mScanning)
{
// Currently scanning, will scan again after completion.
std::cerr << "scanRefresh: already scanning" << std::endl;
Log(lsTRACE) << "Scanning: already scanning";
nothing();
}
@@ -527,8 +579,8 @@ void ConnectionPool::scanRefresh()
int iInterval;
{
ScopedLock sl(theApp->getWalletDB()->getDBLock());
Database *db=theApp->getWalletDB()->getDB();
ScopedLock sl(theApp->getWalletDB()->getDBLock());
Database* db = theApp->getWalletDB()->getDB();
if (db->executeSQL("SELECT * FROM PeerIps INDEXED BY PeerScanIndex WHERE ScanNext NOT NULL ORDER BY ScanNext LIMIT 1;")
&& db->startIterRows())
@@ -551,7 +603,7 @@ void ConnectionPool::scanRefresh()
if (tpNow.is_not_a_date_time())
{
std::cerr << "scanRefresh: no scan needed." << std::endl;
Log(lsINFO) << "Scanning: stop.";
(void) mScanTimer.cancel();
}
@@ -562,14 +614,13 @@ void ConnectionPool::scanRefresh()
(void) mScanTimer.cancel();
bScanning = true;
iInterval *= 2;
// XXX iInterval *= 2;
iInterval = 0;
iInterval = MAX(iInterval, theConfig.PEER_SCAN_INTERVAL_MIN);
tpNext = tpNow + boost::posix_time::seconds(iInterval);
std::cerr << str(boost::format("scanRefresh: now scanning: %s %s (next %s, delay=%s)")
Log(lsTRACE) << str(boost::format("Scanning: %s %s (next %s, delay=%s)")
% mScanIp % mScanPort % tpNext % iInterval) << std::endl;
{
@@ -583,7 +634,8 @@ void ConnectionPool::scanRefresh()
// XXX Check error.
}
if (!peerConnect(mScanIp, mScanPort))
mScanning = peerConnect(mScanIp, mScanPort);
if (!mScanning)
{
// Already connected. Try again.
scanRefresh();
@@ -591,7 +643,7 @@ void ConnectionPool::scanRefresh()
}
else
{
std::cerr << "scanRefresh: next due: " << tpNow << std::endl;
Log(lsINFO) << "Scanning: next: " << tpNow;
mScanTimer.expires_at(tpNext);
mScanTimer.async_wait(boost::bind(&ConnectionPool::scanHandler, this, _1));