Initial scaning support

This commit is contained in:
Arthur Britto
2012-04-30 13:23:54 -07:00
parent 71b5e97ad5
commit b341e02eba
5 changed files with 197 additions and 19 deletions

View File

@@ -44,6 +44,7 @@ DatabaseCon::~DatabaseCon()
Application::Application() :
mUNL(mIOService),
mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL),
mConnectionPool(mIOService),
mPeerDoor(NULL), mRPCDoor(NULL)
{
nothing();

View File

@@ -1,18 +1,19 @@
#include <boost/asio.hpp>
#include <boost/foreach.hpp>
#include "ConnectionPool.h"
#include "Config.h"
#include "Peer.h"
#include "Application.h"
#include "utils.h"
// XXX On Windows make sure OpenSSL PRNG is seeded: EGADS
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/foreach.hpp>
#include <boost/format.hpp>
ConnectionPool::ConnectionPool() :
iConnecting(0),
mCtx(boost::asio::ssl::context::sslv23)
ConnectionPool::ConnectionPool(boost::asio::io_service& io_service) :
mCtx(boost::asio::ssl::context::sslv23),
bScanning(false),
mScanTimer(io_service)
{
mCtx.set_options(
boost::asio::ssl::context::default_workarounds
@@ -26,6 +27,9 @@ ConnectionPool::ConnectionPool() :
void ConnectionPool::start()
{
// XXX Start running policy.
// Start scanning.
scanRefresh();
}
// XXX Broken: also don't send a message to a peer if we got it from the peer.
@@ -75,6 +79,7 @@ bool ConnectionPool::peerRegister(Peer::pointer peer, const std::string& strIp,
bool ConnectionPool::connectTo(const std::string& strIp, int iPort)
{
bool bConnecting;
ipPort ip = make_pair(strIp, iPort);
boost::unordered_map<ipPort, Peer::pointer>::iterator it;
@@ -94,15 +99,19 @@ bool ConnectionPool::connectTo(const std::string& strIp, int iPort)
mIpMap[ip] = peer;
peer->connect(strIp, iPort);
bConnecting = true;
}
else
{
// Found it. Already connected.
std::cerr << "ConnectionPool::connectTo: Already connected: "
<< strIp << " " << iPort << std::endl;
bConnecting = false;
}
return true;
return bConnecting;
}
Json::Value ConnectionPool::getPeersJson()
@@ -129,7 +138,7 @@ bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& na)
if (na == theApp->getWallet().getNodePublic())
{
std::cerr << "ConnectionPool::peerConnected: To self." << std::endl;
bSuccess = false;
bSuccess = false;
}
else
{
@@ -183,6 +192,143 @@ void ConnectionPool::peerDisconnected(Peer::pointer peer, const ipPort& ipPeer,
}
}
void ConnectionPool::peerFailed(const std::string& strIp, int iPort)
{
if (bScanning && !mScanIp.compare(strIp), mScanPort == iPort)
{
bScanning = false;
scanRefresh();
}
}
void ConnectionPool::peerVerified(const std::string& strIp, int iPort)
{
if (bScanning && !mScanIp.compare(strIp), mScanPort == iPort)
{
// Scan completed successfully.
{
ScopedLock sl(theApp->getWalletDB()->getDBLock());
Database *db=theApp->getWalletDB()->getDB();
db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=NULL,ScanInterval=0 WHERE IP=%s AND Port=%d;")
% db->escape(strIp)
% iPort));
// XXX Check error.
}
bScanning = false;
scanRefresh();
}
}
void ConnectionPool::scanHandler(const boost::system::error_code& ecResult)
{
if (ecResult == boost::asio::error::operation_aborted)
{
nothing();
}
else if (!ecResult)
{
scanRefresh();
}
else
{
throw std::runtime_error("Internal error: unexpected deadline error.");
}
}
// Scan ips as per db entries.
void ConnectionPool::scanRefresh()
{
if (bScanning)
{
// Currently scanning, will scan again after completion.
std::cerr << "scanRefresh: already scanning" << std::endl;
nothing();
}
else
{
// Discover if there are entries that need scanning.
boost::posix_time::ptime tpNext;
boost::posix_time::ptime tpNow;
std::string strIp;
int iPort;
int iInterval;
{
ScopedLock sl(theApp->getWalletDB()->getDBLock());
Database *db=theApp->getWalletDB()->getDB();
if (db->executeSQL("SELECT * FROM PeerIps INDEXED BY PeerScanIndex WHERE ScanNext NOT NULL ORDER BY ScanNext LIMIT 1;")
&& db->startIterRows())
{
// Have an entry to scan.
int iNext = db->getInt("ScanNext");
tpNext = ptFromSeconds(iNext);
tpNow = boost::posix_time::second_clock::universal_time();
db->getStr("IP", strIp);
iPort = db->getInt("Port");
iInterval = db->getInt("ScanInterval");
}
else
{
// No entries to scan.
tpNow = boost::posix_time::ptime(boost::posix_time::not_a_date_time);
}
}
if (tpNow.is_not_a_date_time())
{
std::cerr << "scanRefresh: no scan needed." << std::endl;
(void) mScanTimer.cancel();
}
else if (tpNext <= tpNow)
{
// Scan it.
(void) mScanTimer.cancel();
std::cerr << "scanRefresh: scanning: " << strIp << " " << iPort << std::endl;
bScanning = true;
mScanIp = strIp;
mScanPort = iPort;
iInterval *= 2;
iInterval = MAX(iInterval, theConfig.PEER_SCAN_INTERVAL_MIN);
tpNext = tpNow + boost::posix_time::seconds(iInterval);
{
ScopedLock sl(theApp->getWalletDB()->getDBLock());
Database *db=theApp->getWalletDB()->getDB();
db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IP=%s AND Port=%d;")
% iToSeconds(tpNext)
% iInterval
% db->escape(strIp)
% iPort));
// XXX Check error.
}
if (!connectTo(mScanIp, mScanPort))
{
// Already connected. Try again.
scanRefresh();
}
}
else
{
std::cerr << "scanRefresh: next due: " << tpNow << std::endl;
mScanTimer.expires_at(tpNext);
mScanTimer.async_wait(boost::bind(&ConnectionPool::scanHandler, this, _1));
}
}
}
#if 0
bool ConnectionPool::isMessageKnown(PackedMessage::pointer msg)
{

View File

@@ -16,22 +16,25 @@ class ConnectionPool
private:
boost::mutex mPeerLock;
typedef std::pair<NewcoinAddress, Peer::pointer> naPeer;
// Count of peers we are in progress of connecting to.
// We are in progress until we know their network public key.
int iConnecting;
typedef std::pair<NewcoinAddress, Peer::pointer> naPeer;
// Peers we are connecting with and non-thin peers we are connected to.
boost::unordered_map<ipPort, Peer::pointer> mIpMap;
boost::unordered_map<ipPort, Peer::pointer> mIpMap;
// Non-thin peers which we are connected to.
boost::unordered_map<NewcoinAddress, Peer::pointer> mConnectedMap;
boost::unordered_map<NewcoinAddress, Peer::pointer> mConnectedMap;
boost::asio::ssl::context mCtx;
bool bScanning;
boost::asio::deadline_timer mScanTimer;
std::string mScanIp;
int mScanPort;
void scanHandler(const boost::system::error_code& ecResult);
public:
ConnectionPool();
ConnectionPool(boost::asio::io_service& io_service);
// Begin enforcing connection policy.
void start();
@@ -56,8 +59,20 @@ public:
// No longer connected.
void peerDisconnected(Peer::pointer peer, const ipPort& ipPeer, const NewcoinAddress& naPeer);
// As client accepted.
void peerVerified(const std::string& strIp, int iPort);
// As client failed connect and be accepted.
void peerFailed(const std::string& strIp, int iPort);
Json::Value getPeersJson();
//
// Scanning
//
void scanRefresh();
#if 0
//std::vector<std::pair<PackedMessage::pointer,int> > mBroadcastMessages;

View File

@@ -63,6 +63,9 @@ void Peer::detach()
// mSocketSsl.close();
if (!mIpPort.first.empty()) {
if (mClientConnect)
theApp->getConnectionPool().peerFailed(mIpPort.first, mIpPort.second);
theApp->getConnectionPool().peerDisconnected(shared_from_this(), mIpPort, mNodePublic);
mIpPort.first.clear();
}
@@ -97,6 +100,8 @@ void Peer::connect(const std::string strIp, int iPort)
{
int iPortAct = iPort < 0 ? SYSTEM_PEER_PORT : iPort;
mClientConnect = true;
std::cerr << "Peer::connect: " << strIp << " " << iPort << std::endl;
mIpPort = make_pair(strIp, iPort);
@@ -156,7 +161,7 @@ void Peer::handleStart(const boost::system::error_code& error)
}
}
// Connect as client.
// Connect ssl as client.
void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it)
{
if (error)
@@ -176,13 +181,15 @@ void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip
}
}
// Connect as server.
// Connect ssl as server.
void Peer::connected(const boost::system::error_code& error)
{
boost::asio::ip::tcp::endpoint ep = mSocketSsl.lowest_layer().remote_endpoint();
int iPort = ep.port();
std::string strIp = ep.address().to_string();
mClientConnect = false;
if (iPort == SYSTEM_PEER_PORT)
iPort = -1;
@@ -502,6 +509,14 @@ void Peer::recvHello(newcoin::TMHello& packet)
// Cancel verification timeout.
(void) mVerifyTimer.cancel();
if (mClientConnect)
{
theApp->getConnectionPool().peerVerified(mIpPort.first, mIpPort.second);
// No longer connecting as client.
mClientConnect = false;
}
// XXX Set timer: connection is in grace period to be useful.
// XXX Set timer: connection idle (idle may vary depending on connection type.)

View File

@@ -31,6 +31,7 @@ public:
void handleConnect(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it);
private:
bool mClientConnect; // In process of connecting as client.
NewcoinAddress mNodePublic; // Node public key of peer.
ipPort mIpPort;
uint256 mCookieHash;