Implement basic low water connection policy.

This commit is contained in:
Arthur Britto
2012-05-01 00:36:42 -07:00
parent 785f9b056a
commit 43ac5f8023
5 changed files with 121 additions and 23 deletions

View File

@@ -9,6 +9,16 @@
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/foreach.hpp> #include <boost/foreach.hpp>
#include <boost/format.hpp> #include <boost/format.hpp>
#include <boost/algorithm/string.hpp>
static void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort)
{
std::vector<std::string> vIpPort;
boost::split(vIpPort, strIpPort, boost::is_any_of(" "));
strIp = vIpPort[0];
iPort = boost::lexical_cast<int>(vIpPort[1]);
}
ConnectionPool::ConnectionPool(boost::asio::io_service& io_service) : ConnectionPool::ConnectionPool(boost::asio::io_service& io_service) :
mCtx(boost::asio::ssl::context::sslv23), mCtx(boost::asio::ssl::context::sslv23),
@@ -27,11 +37,86 @@ ConnectionPool::ConnectionPool(boost::asio::io_service& io_service) :
void ConnectionPool::start() void ConnectionPool::start()
{ {
// XXX Start running policy. // XXX Start running policy.
policyEnforce();
// Start scanning. // Start scanning.
scanRefresh(); scanRefresh();
} }
bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort)
{
Database* db = theApp->getWalletDB()->getDB();
std::vector<std::string> vstrIpPort;
vstrIpPort.reserve(mIpMap.size());
pipPeer ipPeer;
BOOST_FOREACH(ipPeer, mIpMap)
{
std::string& strIp = ipPeer.first.first;
int iPort = ipPeer.first.second;
vstrIpPort.push_back(db->escape(str(boost::format("%s %d") % strIp % iPort)));
}
std::string strIpPort;
ScopedLock sl(theApp->getWalletDB()->getDBLock());
if (db->executeSQL(str(boost::format("SELECT IpPort FROM PeerIps WHERE ScanNext IS NULL AND IpPort NOT IN (%s) LIMIT 1;")
% strJoin(vstrIpPort.begin(), vstrIpPort.end(), ",")))
&& db->startIterRows())
{
db->getStr("IpPort", strIpPort);
}
bool bAvailable = !strIpPort.empty();
if (bAvailable)
splitIpPort(strIpPort, strIp, iPort);
return bAvailable;
}
void ConnectionPool::policyLowWater()
{
std::string strIp;
int iPort;
// Find an entry to connect to.
if (mConnectedMap.size() > theConfig.PEER_CONNECT_LOW_WATER)
{
// Above low water mark, don't need more connections.
nothing();
}
#if 0
else if (miConnectStarting == theConfig.PEER_START_MAX)
{
// Too many connections starting to start another.
nothing();
}
#endif
else if (!peerAvailable(strIp, iPort))
{
// No more connections available to start.
// XXX Might ask peers for more ips.
nothing();
}
else
{
// Try to start connection.
if (!connectTo(strIp, iPort))
throw std::runtime_error("Internal error: standby was already connected.");
// Check if we need more.
policyLowWater();
}
}
void ConnectionPool::policyEnforce()
{
policyLowWater();
}
// XXX Broken: also don't send a message to a peer if we got it from the peer. // XXX Broken: also don't send a message to a peer if we got it from the peer.
void ConnectionPool::relayMessage(Peer* fromPeer, PackedMessage::pointer msg) void ConnectionPool::relayMessage(Peer* fromPeer, PackedMessage::pointer msg)
{ {
@@ -100,6 +185,8 @@ bool ConnectionPool::connectTo(const std::string& strIp, int iPort)
peer->connect(strIp, iPort); peer->connect(strIp, iPort);
// ++miConnectStarting;
bConnecting = true; bConnecting = true;
} }
else else
@@ -205,14 +292,14 @@ void ConnectionPool::peerVerified(const std::string& strIp, int iPort)
{ {
if (bScanning && !mScanIp.compare(strIp), mScanPort == iPort) if (bScanning && !mScanIp.compare(strIp), mScanPort == iPort)
{ {
std::string strIpPort = str(boost::format("%s %d") % strIp % iPort);
// Scan completed successfully. // Scan completed successfully.
{ {
ScopedLock sl(theApp->getWalletDB()->getDBLock()); ScopedLock sl(theApp->getWalletDB()->getDBLock());
Database *db=theApp->getWalletDB()->getDB(); Database *db=theApp->getWalletDB()->getDB();
db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=NULL,ScanInterval=0 WHERE IP=%s AND Port=%d;") db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=NULL,ScanInterval=0 WHERE IpPort=%s;")
% db->escape(strIp) % db->escape(strIpPort)));
% iPort));
// XXX Check error. // XXX Check error.
} }
@@ -252,8 +339,7 @@ void ConnectionPool::scanRefresh()
// Discover if there are entries that need scanning. // Discover if there are entries that need scanning.
boost::posix_time::ptime tpNext; boost::posix_time::ptime tpNext;
boost::posix_time::ptime tpNow; boost::posix_time::ptime tpNow;
std::string strIp; std::string strIpPort;
int iPort;
int iInterval; int iInterval;
{ {
@@ -269,8 +355,7 @@ void ConnectionPool::scanRefresh()
tpNext = ptFromSeconds(iNext); tpNext = ptFromSeconds(iNext);
tpNow = boost::posix_time::second_clock::universal_time(); tpNow = boost::posix_time::second_clock::universal_time();
db->getStr("IP", strIp); db->getStr("IpPort", strIpPort);
iPort = db->getInt("Port");
iInterval = db->getInt("ScanInterval"); iInterval = db->getInt("ScanInterval");
} }
else else
@@ -289,12 +374,12 @@ void ConnectionPool::scanRefresh()
else if (tpNext <= tpNow) else if (tpNext <= tpNow)
{ {
// Scan it. // Scan it.
splitIpPort(strIpPort, mScanIp, mScanPort);
(void) mScanTimer.cancel(); (void) mScanTimer.cancel();
std::cerr << "scanRefresh: scanning: " << strIp << " " << iPort << std::endl; std::cerr << "scanRefresh: scanning: " << mScanIp << " " << mScanPort << std::endl;
bScanning = true; bScanning = true;
mScanIp = strIp;
mScanPort = iPort;
iInterval *= 2; iInterval *= 2;
iInterval = MAX(iInterval, theConfig.PEER_SCAN_INTERVAL_MIN); iInterval = MAX(iInterval, theConfig.PEER_SCAN_INTERVAL_MIN);
@@ -305,11 +390,10 @@ void ConnectionPool::scanRefresh()
ScopedLock sl(theApp->getWalletDB()->getDBLock()); ScopedLock sl(theApp->getWalletDB()->getDBLock());
Database *db=theApp->getWalletDB()->getDB(); Database *db=theApp->getWalletDB()->getDB();
db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IP=%s AND Port=%d;") db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IpPort=%s;")
% iToSeconds(tpNext) % iToSeconds(tpNext)
% iInterval % iInterval
% db->escape(strIp) % db->escape(strIpPort)));
% iPort));
// XXX Check error. // XXX Check error.
} }

View File

@@ -17,6 +17,7 @@ private:
boost::mutex mPeerLock; boost::mutex mPeerLock;
typedef std::pair<NewcoinAddress, Peer::pointer> naPeer; typedef std::pair<NewcoinAddress, Peer::pointer> naPeer;
typedef std::pair<ipPort, Peer::pointer> pipPeer;
// Peers we are connecting with and non-thin peers we are connected to. // Peers we are connecting with and non-thin peers we are connected to.
boost::unordered_map<ipPort, Peer::pointer> mIpMap; boost::unordered_map<ipPort, Peer::pointer> mIpMap;
@@ -33,6 +34,11 @@ private:
void scanHandler(const boost::system::error_code& ecResult); void scanHandler(const boost::system::error_code& ecResult);
// Peers we are establishing a connection with as a client.
// int miConnectStarting;
bool peerAvailable(std::string& strIp, int& iPort);
public: public:
ConnectionPool(boost::asio::io_service& io_service); ConnectionPool(boost::asio::io_service& io_service);
@@ -73,6 +79,12 @@ public:
void scanRefresh(); void scanRefresh();
//
// Connection policy
//
void policyLowWater();
void policyEnforce();
#if 0 #if 0
//std::vector<std::pair<PackedMessage::pointer,int> > mBroadcastMessages; //std::vector<std::pair<PackedMessage::pointer,int> > mBroadcastMessages;

View File

@@ -198,13 +198,11 @@ const char *WalletDBInit[] = {
// ScanInterval: // ScanInterval:
// Delay between scans. // Delay between scans.
"CREATE TABLE PeerIps ( \ "CREATE TABLE PeerIps ( \
IP TEXT NOT NULL, \ IpPort TEXT NOT NULL PRIMARY KEY, \
Port INTEGER NOT NULL DEFAULT -1, \
Score INTEGER NOT NULL, \ Score INTEGER NOT NULL, \
Source CHARACTER(1) NOT NULL, \ Source CHARACTER(1) NOT NULL, \
ScanNext DATETIME DEFAULT 0, \ ScanNext DATETIME DEFAULT 0, \
ScanInterval INTEGER NOT NULL DEFAULT 0, \ ScanInterval INTEGER NOT NULL DEFAULT 0 \
PRIMARY KEY (IP,Port) \
);", );",
"CREATE INDEX PeerScanIndex ON \ "CREATE INDEX PeerScanIndex ON \

View File

@@ -516,6 +516,12 @@ void Peer::recvHello(newcoin::TMHello& packet)
// No longer connecting as client. // No longer connecting as client.
mClientConnect = false; mClientConnect = false;
} }
else
{
// XXX At this point we could add the inbound connection to our IP list. However, the inbound IP address might be that of
// a NAT. It would be best to only add it if and only if we can immediatly verify it.
nothing();
}
// XXX Set timer: connection is in grace period to be useful. // XXX Set timer: connection is in grace period to be useful.
// XXX Set timer: connection idle (idle may vary depending on connection type.) // XXX Set timer: connection idle (idle may vary depending on connection type.)

View File

@@ -409,18 +409,16 @@ void UniqueNodeList::scoreCompute()
BOOST_FOREACH(ipScore, umScore) BOOST_FOREACH(ipScore, umScore)
{ {
ipPort ipEndpoint = ipScore.first; ipPort ipEndpoint = ipScore.first;
std::string strIP = ipEndpoint.first; std::string strIpPort = str(boost::format("%s %d") % ipEndpoint.first % ipEndpoint.second);
int iPort = ipEndpoint.second;
score iPoints = ipScore.second; score iPoints = ipScore.second;
vstrValues.push_back(str(boost::format("(%s,%d,%d,'V')") vstrValues.push_back(str(boost::format("(%s,%d,'V')")
% db->escape(strIP) % db->escape(strIpPort)
% iPort
% iPoints)); % iPoints));
} }
// Set scores for each IP. // Set scores for each IP.
db->executeSQL(str(boost::format("REPLACE INTO PeerIps (IP,Port,Score,Source) VALUES %s;") db->executeSQL(str(boost::format("REPLACE INTO PeerIps (IpPort,Score,Source) VALUES %s;")
% strJoin(vstrValues.begin(), vstrValues.end(), ","))); % strJoin(vstrValues.begin(), vstrValues.end(), ",")));
} }