Add ripple_net module

This commit is contained in:
Vinnie Falco
2013-05-22 17:02:40 -07:00
parent a5360379ac
commit f4cb47fed6
12 changed files with 416 additions and 301 deletions

View File

@@ -15,8 +15,6 @@
#include "utils.h"
#include "Log.h"
SETUP_LOG();
// How often to enforce policies.
#define POLICY_INTERVAL_SECONDS 5
@@ -169,7 +167,7 @@ void ConnectionPool::policyLowWater()
if (getPeerCount() > theConfig.PEER_CONNECT_LOW_WATER)
{
// Above low water mark, don't need more connections.
cLog(lsTRACE) << "Pool: Low water: sufficient connections: " << mConnectedMap.size() << "/" << theConfig.PEER_CONNECT_LOW_WATER;
WriteLog (lsTRACE, ConnectionPool) << "Pool: Low water: sufficient connections: " << mConnectedMap.size() << "/" << theConfig.PEER_CONNECT_LOW_WATER;
nothing();
}
@@ -183,7 +181,7 @@ void ConnectionPool::policyLowWater()
else if (!peerAvailable(strIp, iPort))
{
// No more connections available to start.
cLog(lsTRACE) << "Pool: Low water: no peers available.";
WriteLog (lsTRACE, ConnectionPool) << "Pool: Low water: no peers available.";
// XXX Might ask peers for more ips.
nothing();
@@ -191,11 +189,11 @@ void ConnectionPool::policyLowWater()
else
{
// Try to start connection.
cLog(lsTRACE) << "Pool: Low water: start connection.";
WriteLog (lsTRACE, ConnectionPool) << "Pool: Low water: start connection.";
if (!peerConnect(strIp, iPort))
{
cLog(lsINFO) << "Pool: Low water: already connected.";
WriteLog (lsINFO, ConnectionPool) << "Pool: Low water: already connected.";
}
// Check if we need more.
@@ -213,7 +211,7 @@ void ConnectionPool::policyEnforce()
if (((++mPhase) % 12) == 0)
{
cLog(lsTRACE) << "Making configured connections";
WriteLog (lsTRACE, ConnectionPool) << "Making configured connections";
makeConfigured();
}
@@ -321,11 +319,11 @@ Peer::pointer ConnectionPool::peerConnect(const std::string& strIp, int iPort)
if (ppResult)
{
ppResult->connect(strIp, iPort);
cLog(lsDEBUG) << "Pool: Connecting: " << strIp << " " << iPort;
WriteLog (lsDEBUG, ConnectionPool) << "Pool: Connecting: " << strIp << " " << iPort;
}
else
{
cLog(lsTRACE) << "Pool: Already connected: " << strIp << " " << iPort;
WriteLog (lsTRACE, ConnectionPool) << "Pool: Already connected: " << strIp << " " << iPort;
}
return ppResult;
@@ -386,7 +384,7 @@ bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer,
if (naPeer == theApp->getWallet().getNodePublic())
{
cLog(lsINFO) << "Pool: Connected: self: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort;
WriteLog (lsINFO, ConnectionPool) << "Pool: Connected: self: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort;
}
else
{
@@ -396,7 +394,7 @@ bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer,
if (itCm == mConnectedMap.end())
{
// New connection.
//cLog(lsINFO) << "Pool: Connected: new: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort;
//WriteLog (lsINFO, ConnectionPool) << "Pool: Connected: new: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort;
mConnectedMap[naPeer] = peer;
bNew = true;
@@ -412,7 +410,7 @@ bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer,
if (itCm->second->getIP().empty())
{
// Old peer did not know it's IP.
//cLog(lsINFO) << "Pool: Connected: redundant: outbound: " << ADDRESS_SHARED(peer) << " discovered: " << ADDRESS_SHARED(itCm->second) << ": " << strIP << " " << iPort;
//WriteLog (lsINFO, ConnectionPool) << "Pool: Connected: redundant: outbound: " << ADDRESS_SHARED(peer) << " discovered: " << ADDRESS_SHARED(itCm->second) << ": " << strIP << " " << iPort;
itCm->second->setIpPort(strIP, iPort);
@@ -422,14 +420,14 @@ bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer,
else
{
// Old peer knew its IP. Do nothing.
//cLog(lsINFO) << "Pool: Connected: redundant: outbound: rediscovered: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort;
//WriteLog (lsINFO, ConnectionPool) << "Pool: Connected: redundant: outbound: rediscovered: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort;
nothing();
}
}
else
{
//cLog(lsINFO) << "Pool: Connected: redundant: inbound: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort;
//WriteLog (lsINFO, ConnectionPool) << "Pool: Connected: redundant: inbound: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort;
nothing();
}
@@ -450,12 +448,12 @@ void ConnectionPool::peerDisconnected(Peer::ref peer, const RippleAddress& naPee
if (itCm == mConnectedMap.end())
{
// Did not find it. Not already connecting or connected.
cLog(lsWARNING) << "Pool: disconnected: Internal Error: mConnectedMap was inconsistent.";
WriteLog (lsWARNING, ConnectionPool) << "Pool: disconnected: Internal Error: mConnectedMap was inconsistent.";
// XXX Maybe bad error, considering we have racing connections, may not so bad.
}
else if (itCm->second != peer)
{
cLog(lsWARNING) << "Pool: disconected: non canonical entry";
WriteLog (lsWARNING, ConnectionPool) << "Pool: disconected: non canonical entry";
nothing();
}
@@ -464,12 +462,12 @@ void ConnectionPool::peerDisconnected(Peer::ref peer, const RippleAddress& naPee
// Found it. Delete it.
mConnectedMap.erase(itCm);
//cLog(lsINFO) << "Pool: disconnected: " << naPeer.humanNodePublic() << " " << peer->getIP() << " " << peer->getPort();
//WriteLog (lsINFO, ConnectionPool) << "Pool: disconnected: " << naPeer.humanNodePublic() << " " << peer->getIP() << " " << peer->getPort();
}
}
else
{
//cLog(lsINFO) << "Pool: disconnected: anonymous: " << peer->getIP() << " " << peer->getPort();
//WriteLog (lsINFO, ConnectionPool) << "Pool: disconnected: anonymous: " << peer->getIP() << " " << peer->getPort();
}
assert(peer->getPeerId() != 0);
@@ -498,7 +496,7 @@ bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort)
boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time();
boost::posix_time::ptime tpNext = tpNow + boost::posix_time::seconds(iInterval);
//cLog(lsINFO) << str(boost::format("Pool: Scan: schedule create: %s %s (next %s, delay=%d)")
//WriteLog (lsINFO, ConnectionPool) << str(boost::format("Pool: Scan: schedule create: %s %s (next %s, delay=%d)")
// % mScanIp % mScanPort % tpNext % (tpNext-tpNow).total_seconds());
db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IpPort=%s;")
@@ -514,14 +512,14 @@ bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort)
// boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time();
// boost::posix_time::ptime tpNext = ptFromSeconds(db->getInt("ScanNext"));
//cLog(lsINFO) << str(boost::format("Pool: Scan: schedule exists: %s %s (next %s, delay=%d)")
//WriteLog (lsINFO, ConnectionPool) << str(boost::format("Pool: Scan: schedule exists: %s %s (next %s, delay=%d)")
// % mScanIp % mScanPort % tpNext % (tpNext-tpNow).total_seconds());
}
db->endIterRows();
}
else
{
//cLog(lsWARNING) << "Pool: Scan: peer wasn't in PeerIps: " << strIp << " " << iPort;
//WriteLog (lsWARNING, ConnectionPool) << "Pool: Scan: peer wasn't in PeerIps: " << strIp << " " << iPort;
}
return bScanDirty;
@@ -536,7 +534,7 @@ void ConnectionPool::peerClosed(Peer::ref peer, const std::string& strIp, int iP
// If the connection was our scan, we are no longer scanning.
if (mScanning && mScanning == peer)
{
//cLog(lsINFO) << "Pool: Scan: scan fail: " << strIp << " " << iPort;
//WriteLog (lsINFO, ConnectionPool) << "Pool: Scan: scan fail: " << strIp << " " << iPort;
mScanning.reset(); // No longer scanning.
bScanRefresh = true; // Look for more to scan.
@@ -551,13 +549,13 @@ void ConnectionPool::peerClosed(Peer::ref peer, const std::string& strIp, int iP
if (itIp == mIpMap.end())
{
// Did not find it. Not already connecting or connected.
cLog(lsWARNING) << "Pool: Closed: UNEXPECTED: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
WriteLog (lsWARNING, ConnectionPool) << "Pool: Closed: UNEXPECTED: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
// XXX Internal error.
}
else if (mIpMap[ipPeer] == peer)
{
// We were the identified connection.
//cLog(lsINFO) << "Pool: Closed: identified: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
//WriteLog (lsINFO, ConnectionPool) << "Pool: Closed: identified: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
// Delete our entry.
mIpMap.erase(itIp);
@@ -567,7 +565,7 @@ void ConnectionPool::peerClosed(Peer::ref peer, const std::string& strIp, int iP
else
{
// Found it. But, we were redundant.
//cLog(lsINFO) << "Pool: Closed: redundant: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
//WriteLog (lsINFO, ConnectionPool) << "Pool: Closed: redundant: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
}
}
@@ -591,7 +589,7 @@ void ConnectionPool::peerVerified(Peer::ref peer)
std::string strIpPort = str(boost::format("%s %d") % strIp % iPort);
//cLog(lsINFO) << str(boost::format("Pool: Scan: connected: %s %s %s (scanned)") % ADDRESS_SHARED(peer) % strIp % iPort);
//WriteLog (lsINFO, ConnectionPool) << str(boost::format("Pool: Scan: connected: %s %s %s (scanned)") % ADDRESS_SHARED(peer) % strIp % iPort);
if (peer->getNodePublic() == theApp->getWallet().getNodePublic())
{
@@ -656,7 +654,7 @@ void ConnectionPool::scanRefresh()
else if (mScanning)
{
// Currently scanning, will scan again after completion.
cLog(lsTRACE) << "Pool: Scan: already scanning";
WriteLog (lsTRACE, ConnectionPool) << "Pool: Scan: already scanning";
nothing();
}
@@ -694,7 +692,7 @@ void ConnectionPool::scanRefresh()
if (tpNow.is_not_a_date_time())
{
//cLog(lsINFO) << "Pool: Scan: stop.";
//WriteLog (lsINFO, ConnectionPool) << "Pool: Scan: stop.";
(void) mScanTimer.cancel();
}
@@ -709,7 +707,7 @@ void ConnectionPool::scanRefresh()
tpNext = tpNow + boost::posix_time::seconds(iInterval);
//cLog(lsINFO) << str(boost::format("Pool: Scan: Now: %s %s (next %s, delay=%d)")
//WriteLog (lsINFO, ConnectionPool) << str(boost::format("Pool: Scan: Now: %s %s (next %s, delay=%d)")
// % mScanIp % mScanPort % tpNext % (tpNext-tpNow).total_seconds());
iInterval *= 2;
@@ -734,7 +732,7 @@ void ConnectionPool::scanRefresh()
}
else
{
//cLog(lsINFO) << str(boost::format("Pool: Scan: Next: %s (next %s, delay=%d)")
//WriteLog (lsINFO, ConnectionPool) << str(boost::format("Pool: Scan: Next: %s (next %s, delay=%d)")
// % strIpPort % tpNext % (tpNext-tpNow).total_seconds());
mScanTimer.expires_at(tpNext);