Transmit node public and don't talk to self or redundant nodes.

This commit is contained in:
Arthur Britto
2012-04-27 14:09:13 -07:00
parent fe66dcfdc2
commit b49b0d59a5
6 changed files with 251 additions and 178 deletions

View File

@@ -34,7 +34,7 @@ void ConnectionPool::relayMessage(Peer* fromPeer, PackedMessage::pointer msg)
// Inbound connection, false=reject // Inbound connection, false=reject
// Reject addresses we already have in our table. // Reject addresses we already have in our table.
// XXX Reject, if we have too many connections. // XXX Reject, if we have too many connections.
bool ConnectionPool::peerAccepted(Peer::pointer peer, const std::string& strIp, int iPort) bool ConnectionPool::peerRegister(Peer::pointer peer, const std::string& strIp, int iPort)
{ {
bool bAccept; bool bAccept;
ipPort ip = make_pair(strIp, iPort); ipPort ip = make_pair(strIp, iPort);
@@ -49,7 +49,7 @@ bool ConnectionPool::peerAccepted(Peer::pointer peer, const std::string& strIp,
{ {
// Did not find it. Not already connecting or connected. // Did not find it. Not already connecting or connected.
std::cerr << "ConnectionPool::peerAccepted: " << ip.first << " " << ip.second << std::endl; std::cerr << "ConnectionPool::peerRegister: " << ip.first << " " << ip.second << std::endl;
// Mark as connecting. // Mark as connecting.
mIpMap[ip] = peer; mIpMap[ip] = peer;
bAccept = true; bAccept = true;
@@ -77,6 +77,8 @@ bool ConnectionPool::connectTo(const std::string& strIp, int iPort)
if (it == mIpMap.end()) if (it == mIpMap.end())
{ {
// Did not find it. Not already connecting or connected. // Did not find it. Not already connecting or connected.
std::cerr << "ConnectionPool::connectTo: Connectting: "
<< strIp << " " << iPort << std::endl;
Peer::pointer peer(Peer::create(theApp->getIOService())); Peer::pointer peer(Peer::create(theApp->getIOService()));
@@ -108,9 +110,24 @@ Json::Value ConnectionPool::getPeersJson()
return ret; return ret;
} }
void ConnectionPool::peerConnected(Peer::pointer peer) // Now know peer's node public key. Determine if we want to stay connected.
bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& na)
{ {
std::cerr << "ConnectionPool::peerConnected" << std::endl; bool bSuccess;
std::cerr << "ConnectionPool::peerConnected: " << na.humanNodePublic() << std::endl;
if (na == theApp->getWallet().getNodePublic())
{
std::cerr << "ConnectionPool::peerConnected: To self." << std::endl;
bSuccess = false;
}
else
{
bSuccess = true;
}
return bSuccess;
} }
void ConnectionPool::peerDisconnected(Peer::pointer peer) void ConnectionPool::peerDisconnected(Peer::pointer peer)
@@ -147,7 +164,8 @@ void ConnectionPool::peerDisconnected(Peer::pointer peer)
if (itIp == mIpMap.end()) if (itIp == mIpMap.end())
{ {
// Did not find it. Not already connecting or connected. // Did not find it. Not already connecting or connected.
std::cerr << "Internal Error: peer wasn't connected." << std::endl; std::cerr << "Internal Error: peer wasn't connected: "
<< peer->mIpPort.first << " " << peer->mIpPort.second << std::endl;
// XXX Bad error. // XXX Bad error.
} }
else else

View File

@@ -45,10 +45,10 @@ public:
// //
// Inbound connection, false=reject // Inbound connection, false=reject
bool peerAccepted(Peer::pointer peer, const std::string& strIp, int iPort); bool peerRegister(Peer::pointer peer, const std::string& strIp, int iPort);
// We know peers node public key. // We know peers node public key. false=reject
void peerConnected(Peer::pointer peer); bool peerConnected(Peer::pointer peer, const NewcoinAddress& na);
// No longer connected. // No longer connected.
void peerDisconnected(Peer::pointer peer); void peerDisconnected(Peer::pointer peer);

View File

@@ -14,6 +14,7 @@
#include "Application.h" #include "Application.h"
#include "Conversion.h" #include "Conversion.h"
#include "SerializedTransaction.h" #include "SerializedTransaction.h"
#include "utils.h"
Peer::Peer(boost::asio::io_service& io_service) Peer::Peer(boost::asio::io_service& io_service)
: mSocket(io_service), : mSocket(io_service),
@@ -26,7 +27,7 @@ void Peer::handle_write(const boost::system::error_code& error, size_t bytes_tra
{ {
#ifdef DEBUG #ifdef DEBUG
if(error) if(error)
std::cout << "Peer::handle_write Error: " << error << " bytes: "<< bytes_transferred << std::endl; std::cout << "Peer::handle_write Error: " << error << " bytes: " << bytes_transferred << std::endl;
else else
std::cout << "Peer::handle_write bytes: "<< bytes_transferred << std::endl; std::cout << "Peer::handle_write bytes: "<< bytes_transferred << std::endl;
#endif #endif
@@ -55,7 +56,10 @@ void Peer::detach()
mSendQ.clear(); mSendQ.clear();
mSocket.close(); mSocket.close();
if (!mIpPort.first.empty()) {
theApp->getConnectionPool().peerDisconnected(shared_from_this()); theApp->getConnectionPool().peerDisconnected(shared_from_this());
mIpPort.first.clear();
}
} }
// Begin trying to connect. We are not connected till we know and accept peer's public key. // Begin trying to connect. We are not connected till we know and accept peer's public key.
@@ -64,6 +68,9 @@ void Peer::connect(const std::string strIp, int iPort)
{ {
int iPortAct = iPort < 0 ? SYSTEM_PEER_PORT : iPort; int iPortAct = iPort < 0 ? SYSTEM_PEER_PORT : iPort;
std::cout << "Peer::connect: " << strIp << " " << iPort << std::endl;
mIpPort = make_pair(strIp, iPort);
boost::asio::ip::tcp::resolver::query query(strIp, boost::lexical_cast<std::string>(iPortAct), boost::asio::ip::tcp::resolver::query query(strIp, boost::lexical_cast<std::string>(iPortAct),
boost::asio::ip::resolver_query_base::numeric_host|boost::asio::ip::resolver_query_base::numeric_service); boost::asio::ip::resolver_query_base::numeric_host|boost::asio::ip::resolver_query_base::numeric_service);
boost::asio::ip::tcp::resolver resolver(theApp->getIOService()); boost::asio::ip::tcp::resolver resolver(theApp->getIOService());
@@ -72,12 +79,13 @@ void Peer::connect(const std::string strIp, int iPort)
if (err || itrEndpoint == boost::asio::ip::tcp::resolver::iterator()) if (err || itrEndpoint == boost::asio::ip::tcp::resolver::iterator())
{ {
std::cerr << "Peer::connect: Bad IP" << std::endl;
// Failed to resolve ip. // Failed to resolve ip.
detach(); detach();
} }
else else
{ {
mIpPort = make_pair(strIp, iPort); std::cerr << "Peer::connect: Connectting: " << mIpPort.first << " " << mIpPort.second << std::endl;
#if 1 #if 1
boost::asio::async_connect( boost::asio::async_connect(
mSocket, mSocket,
@@ -112,9 +120,9 @@ void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip
else else
{ {
std::cout << "Socket Connected." << std::endl; std::cout << "Socket Connected." << std::endl;
// XXX Exchange public keys.
sendHello();
start_read_header(); start_read_header();
sendHello();
} }
} }
@@ -135,7 +143,7 @@ void Peer::connected(const boost::system::error_code& error)
std::cout << "Remote peer: accept error: " << error << std::endl; std::cout << "Remote peer: accept error: " << error << std::endl;
detach(); detach();
} }
else if (!theApp->getConnectionPool().peerAccepted(shared_from_this(), strIp, iPort)) else if (!theApp->getConnectionPool().peerRegister(shared_from_this(), strIp, iPort))
{ {
std::cout << "Remote peer: rejecting." << std::endl; std::cout << "Remote peer: rejecting." << std::endl;
// XXX Reject with a rejection message: already connected // XXX Reject with a rejection message: already connected
@@ -143,13 +151,15 @@ void Peer::connected(const boost::system::error_code& error)
} }
else else
{ {
// Not redundant, add to connection list.
std::cout << "Remote peer: accepted." << std::endl; std::cout << "Remote peer: accepted." << std::endl;
//BOOST_LOG_TRIVIAL(info) << "Connected to Peer."; //BOOST_LOG_TRIVIAL(info) << "Connected to Peer.";
// Not redundant, add to connection list. mIpPort = make_pair(strIp, iPort);
// XXX Exchange public keys.
sendHello();
start_read_header(); start_read_header();
sendHello();
} }
} }
@@ -240,6 +250,15 @@ void Peer::processReadBuffer()
#ifdef DEBUG #ifdef DEBUG
std::cerr << "PRB(" << type << "), len=" << (mReadbuf.size()-HEADER_SIZE) << std::endl; std::cerr << "PRB(" << type << "), len=" << (mReadbuf.size()-HEADER_SIZE) << std::endl;
#endif #endif
if (mIpPort.first.empty() == (type == newcoin::mtHELLO))
{
// If not connectted, only accept mtHELLO. Otherwise, don't accept mtHELLO.
std::cerr << "Wrong message type: " << type << std::endl;
detach();
}
else
{
switch(type) switch(type)
{ {
case newcoin::mtHELLO: case newcoin::mtHELLO:
@@ -341,7 +360,7 @@ void Peer::processReadBuffer()
} }
break; break;
#if 0 #if 0
case newcoin::mtPROPOSE_LEDGER: case newcoin::mtPROPOSE_LEDGER:
{ {
newcoin::TM msg; newcoin::TM msg;
@@ -378,7 +397,7 @@ void Peer::processReadBuffer()
} }
break; break;
#endif #endif
case newcoin::mtGET_OBJECT: case newcoin::mtGET_OBJECT:
{ {
@@ -401,14 +420,44 @@ void Peer::processReadBuffer()
default: default:
std::cout << "Unknown Msg: " << type << std::endl; //else BOOST_LOG_TRIVIAL(info) << "Error: " << error; std::cout << "Unknown Msg: " << type << std::endl; //else BOOST_LOG_TRIVIAL(info) << "Error: " << error;
} }
}
} }
void Peer::recvHello(newcoin::TMHello& packet) void Peer::recvHello(newcoin::TMHello& packet)
{ {
#ifdef DEBUG #ifdef DEBUG
std::cerr << "Recv(Hello) v=" << packet.version() << ", index=" << packet.ledgerindex() << std::endl; std::cerr << "Recv(Hello) v=" << packet.version()
<< ", index=" << packet.ledgerindex()
<< std::endl;
#endif #endif
bool bDetach = true;
if (mPublicKey.isValid())
{
std::cerr << "Recv(Hello): Disconnect: Extraneous node public key." << std::endl;
}
else if (!mPublicKey.setNodePublic(packet.nodepublic()))
{
std::cerr << "Recv(Hello): Disconnect: Bad node public key." << std::endl;
}
else if (!theApp->getConnectionPool().peerConnected(shared_from_this(), mPublicKey))
{
// Already connected, self, or some other reason.
std::cerr << "Recv(Hello): Disconnect: Extraneous connection." << std::endl;
}
else
{
// Successful connection.
// XXX Kill hello timer.
// XXX Set timer: connection is in grace period to be useful.
bDetach = false;
}
if (bDetach)
{
mPublicKey.clear();
detach();
}
} }
void Peer::recvTransaction(newcoin::TMTransaction& packet) void Peer::recvTransaction(newcoin::TMTransaction& packet)
@@ -605,11 +654,13 @@ void Peer::recvLedger(newcoin::TMLedgerData& packet)
void Peer::sendHello() void Peer::sendHello()
{ {
// XXX Start timer for hello required by.
newcoin::TMHello* h=new newcoin::TMHello(); newcoin::TMHello* h=new newcoin::TMHello();
// set up parameters // set up parameters
h->set_version(theConfig.VERSION); h->set_version(theConfig.VERSION);
h->set_ledgerindex(theApp->getOPs().getCurrentLedgerID()); h->set_ledgerindex(theApp->getOPs().getCurrentLedgerID());
h->set_nettime(theApp->getOPs().getNetworkTime()); h->set_nettime(theApp->getOPs().getNetworkTime());
h->set_nodepublic(theApp->getWallet().getNodePublic().humanNodePublic());
h->set_ipv4port(theConfig.PEER_PORT); h->set_ipv4port(theConfig.PEER_PORT);
Ledger::pointer closingLedger=theApp->getMasterLedger().getClosingLedger(); Ledger::pointer closingLedger=theApp->getMasterLedger().getClosingLedger();

View File

@@ -28,12 +28,14 @@ public:
static const int psbGotHello=0, psbSentHello=1, psbInMap=2, psbTrusted=3; static const int psbGotHello=0, psbSentHello=1, psbInMap=2, psbTrusted=3;
static const int psbNoLedgers=4, psbNoTransactions=5, psbDownLevel=6; static const int psbNoLedgers=4, psbNoTransactions=5, psbDownLevel=6;
NewcoinAddress mPublicKey; // Public key of peer. NewcoinAddress mPublicKey; // Node public key of peer.
ipPort mIpPort; ipPort mIpPort;
void handleConnect(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it); void handleConnect(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it);
private: private:
bool bRegistered;
boost::asio::ip::tcp::socket mSocket; boost::asio::ip::tcp::socket mSocket;
boost::asio::ssl::context mCtx; boost::asio::ssl::context mCtx;
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> mSocketSsl; boost::asio::ssl::stream<boost::asio::ip::tcp::socket> mSocketSsl;

View File

@@ -50,6 +50,8 @@ public:
// - Maintain peer connectivity through validation and peer management. // - Maintain peer connectivity through validation and peer management.
void start(); void start();
NewcoinAddress& getNodePublic() { return mNodePublicKey; }
NewcoinAddress addFamily(const std::string& passPhrase, bool lock); NewcoinAddress addFamily(const std::string& passPhrase, bool lock);
NewcoinAddress addFamily(const NewcoinAddress& familySeed, bool lock); NewcoinAddress addFamily(const NewcoinAddress& familySeed, bool lock);
NewcoinAddress addFamily(const NewcoinAddress& familyGenerator); NewcoinAddress addFamily(const NewcoinAddress& familyGenerator);

View File

@@ -38,7 +38,7 @@ message TMHello {
required uint32 version = 1; required uint32 version = 1;
optional uint32 ledgerIndex = 2; optional uint32 ledgerIndex = 2;
optional uint64 netTime = 3; optional uint64 netTime = 3;
optional bytes nodeID = 4; // node may opt to remain anonymous optional bytes nodePublic = 4; // node may opt to remain anonymous
optional uint32 ipv4Port = 5; optional uint32 ipv4Port = 5;
optional bytes closedLedger = 6; optional bytes closedLedger = 6;
} }