Work toward policy based network connectivity.

This commit is contained in:
Arthur Britto
2012-04-25 23:04:38 -07:00
parent ea809a0fe0
commit 396eca1a3c
15 changed files with 315 additions and 144 deletions

View File

@@ -9,7 +9,7 @@
AccountState::AccountState(const NewcoinAddress& id) : mAccountID(id), mValid(false)
{
if (!id.IsValid()) return;
if (!id.isValid()) return;
mLedgerEntry = boost::make_shared<SerializedLedgerEntry>(ltACCOUNT_ROOT);
mLedgerEntry->setIndex(Ledger::getAccountRootIndex(id));
mLedgerEntry->setIFieldAccount(sfAccount, id);
@@ -21,7 +21,7 @@ AccountState::AccountState(SerializedLedgerEntry::pointer ledgerEntry) : mLedger
if (!mLedgerEntry) return;
if (mLedgerEntry->getType()!=ltACCOUNT_ROOT) return;
mAccountID = mLedgerEntry->getValueFieldAccount(sfAccount);
if (mAccountID.IsValid()) mValid = true;
if (mAccountID.isValid()) mValid = true;
}
void AccountState::addJson(Json::Value& val)

View File

@@ -75,7 +75,6 @@ void Application::run()
// Begin validation and ip maintenance.
// - Wallet maintains local information: including identity and network connection persistency information.
//
mWallet.start();
//
@@ -94,10 +93,12 @@ void Application::run()
mRPCDoor=new RPCDoor(mIOService);
}//else BOOST_LOG_TRIVIAL(info) << "No RPC Port set. Not listening for commands.";
mConnectionPool.connectToNetwork(mIOService);
mTimingService.start(mIOService);
//
// Begin connectting to network.
//
mConnectionPool.start();
std::cout << "Before Run." << std::endl;
mTimingService.start(mIOService);
// Temporary root account will be ["This is my payphrase."]:0
NewcoinAddress rootFamilySeed; // Hold the 128 password.

View File

@@ -6,7 +6,7 @@
#include <fstream>
#include <boost/lexical_cast.hpp>
#define CONFIG_FILE_NAME "newcoind.cfg"
#define CONFIG_FILE_NAME SYSTEM_NAME "d.cfg" // newcoind.cfg
#define SECTION_PEER_IP "peer_ip"
#define SECTION_PEER_PORT "peer_port"
#define SECTION_RPC_IP "rpc_ip"
@@ -22,8 +22,7 @@ Config::Config()
NETWORK_START_TIME=1319844908;
PEER_PORT=6561;
PEER_PORT=SYSTEM_PEER_PORT;
RPC_PORT=5001;
NUMBER_CONNECTIONS=30;

View File

@@ -1,4 +1,11 @@
#include "string"
#ifndef __CONFIG__
#define __CONFIG__
#include <string>
#define SYSTEM_NAME "newcoin"
const int SYSTEM_PEER_PORT=6561;
class Config
{
@@ -8,8 +15,6 @@ public:
std::string VERSION_STR;
// network parameters
// std::string NETWORK_ID;
// std::string NETWORK_DNS_SEEDS;
int NETWORK_START_TIME; // The Unix time we start ledger 0
int TRANSACTION_FEE_BASE;
int LEDGER_SECONDS;
@@ -28,8 +33,6 @@ public:
// bool NODE_DUMB; // we are a 'dumb' client
// bool NODE_SMART; // we offer services to 'dumb' clients
// std::string HANKO_PRIVATE;
// RPC parameters
std::string RPC_IP;
int RPC_PORT;
@@ -48,3 +51,4 @@ public:
};
extern Config theConfig;
#endif

View File

@@ -1,65 +1,96 @@
#include <boost/foreach.hpp>
#include <boost/asio.hpp>
#include <boost/foreach.hpp>
#include "ConnectionPool.h"
#include "Config.h"
#include "Peer.h"
#include "Application.h"
#include "utils.h"
ConnectionPool::ConnectionPool()
ConnectionPool::ConnectionPool() :
iConnecting(0)
{ ; }
void ConnectionPool::connectToNetwork(boost::asio::io_service& io_service)
void ConnectionPool::start()
{
#if 0
for(int n=0; n<theConfig.NUMBER_CONNECTIONS; n++)
{
KnownNode* node=nodeList.getNextNode();
if(!node) return;
Peer::pointer peer=Peer::create(io_service);
// peer->connectTo(*node); // FIXME
mPeers.push_back(peer);
}
#endif
// XXX Start running policy.
}
// XXX Broken don't send a message to a peer if we got it from the peer.
void ConnectionPool::relayMessage(Peer* fromPeer, PackedMessage::pointer msg)
{
BOOST_FOREACH(Peer::pointer peer, mPeers)
BOOST_FOREACH(naPeer pair, mConnectedMap)
{
Peer::pointer peer = pair.second;
if(!fromPeer || !(peer.get() == fromPeer))
peer->sendPacket(msg);
}
}
bool ConnectionPool::connectTo(const std::string& host, const std::string& port)
// Inbound connection, false=reject
// Reject addresses we already have in our table.
// XXX Reject, if we have too many connections.
bool ConnectionPool::peerAccepted(Peer::pointer peer, const std::string& strIp, int iPort)
{
try
bool bAccept;
ipPort ip = make_pair(strIp, iPort);
boost::unordered_map<ipPort, Peer::pointer>::iterator it;
boost::mutex::scoped_lock sl(mPeerLock);
it = mIpMap.find(ip);
if (it == mIpMap.end())
{
boost::asio::ip::tcp::resolver res(theApp->getIOService());
boost::asio::ip::tcp::resolver::query query(host.c_str(), port.c_str());
boost::asio::ip::tcp::resolver::iterator it(res.resolve(query)), end;
// Did not find it. Not already connecting or connected.
std::cerr << "ConnectionPool::peerAccepted: " << ip.first << " " << ip.second << std::endl;
// Mark as connecting.
mIpMap[ip] = peer;
bAccept = true;
}
else
{
// Found it. Already connected or connecting.
bAccept = false;
}
return bAccept;
}
bool ConnectionPool::connectTo(const std::string& strIp, int iPort)
{
ipPort ip = make_pair(strIp, iPort);
boost::unordered_map<ipPort, Peer::pointer>::iterator it;
boost::mutex::scoped_lock sl(mPeerLock);
it = mIpMap.find(ip);
if (it == mIpMap.end())
{
// Did not find it. Not already connecting or connected.
Peer::pointer peer(Peer::create(theApp->getIOService()));
boost::system::error_code error = boost::asio::error::host_not_found;
while (error && (it!=end))
mIpMap[ip] = peer;
peer->connect(strIp, iPort);
}
else
{
peer->getSocket().close();
peer->getSocket().connect(*it++, error);
}
if(error) return false;
boost::mutex::scoped_lock sl(peerLock);
mPeers.push_back(peer);
peer->connected(boost::system::error_code());
}
catch (...)
{
return false;
// Found it. Already connected.
std::cerr << "ConnectionPool::connectTo: Already connected: "
<< strIp << " " << iPort << std::endl;
}
return true;
}
@@ -67,55 +98,67 @@ Json::Value ConnectionPool::getPeersJson()
{
Json::Value ret(Json::arrayValue);
BOOST_FOREACH(Peer::pointer peer, mPeers)
BOOST_FOREACH(naPeer pair, mConnectedMap)
{
Peer::pointer peer = pair.second;
ret.append(peer->getJson());
}
return ret;
}
void ConnectionPool::peerDisconnected(NewcoinAddress naPeer)
void ConnectionPool::peerConnected(Peer::pointer peer)
{
std::cerr << "ConnectionPool::peerDisconnected" << std::cerr;
std::cerr << "ConnectionPool::peerConnected" << std::endl;
}
void ConnectionPool::peerDisconnected(Peer::pointer peer)
{
std::cerr << "ConnectionPool::peerDisconnected: " << peer->mIpPort.first << " " << peer->mIpPort.second << std::endl;
boost::mutex::scoped_lock sl(mPeerLock);
// XXX Don't access member variable directly.
if (peer->mPublicKey.isValid())
{
// XXX Would be better if NewcoinAddress had a hash function.
boost::unordered_map<std::vector<unsigned char>, Peer::pointer>::iterator itCm;
itCm = mConnectedMap.find(peer->mPublicKey.getNodePublic());
if (itCm == mConnectedMap.end())
{
// Did not find it. Not already connecting or connected.
std::cerr << "Internal Error: peer connection was inconsistent." << std::endl;
// XXX Bad error.
}
else
{
// Found it. Delete it.
mConnectedMap.erase(itCm);
}
}
// XXX Don't access member variable directly.
boost::unordered_map<ipPort, Peer::pointer>::iterator itIp;
itIp = mIpMap.find(peer->mIpPort);
if (itIp == mIpMap.end())
{
// Did not find it. Not already connecting or connected.
std::cerr << "Internal Error: peer wasn't connected." << std::endl;
// XXX Bad error.
}
else
{
// Found it. Delete it.
mIpMap.erase(itIp);
}
}
#if 0
bool ConnectionPool::addToMap(const uint160& hanko, Peer::pointer peer)
{
boost::mutex::scoped_lock sl(peerLock);
return peerMap.insert(std::make_pair(hanko, peer)).second;
}
bool ConnectionPool::delFromMap(const uint160& hanko)
{
boost::mutex::scoped_lock sl(peerLock);
std::map<uint160, Peer::pointer>::iterator it=peerMap.find(hanko);
if((it==peerMap.end()) || (it->first!=hanko)) return false;
peerMap.erase(it);
return true;
}
bool ConnectionPool::inMap(const uint160& hanko)
{
boost::mutex::scoped_lock sl(peerLock);
return peerMap.find(hanko) != peerMap.end();
}
std::map<uint160, Peer::pointer> ConnectionPool::getAllConnected()
{
boost::mutex::scoped_lock sl(peerLock);
return peerMap;
}
Peer::pointer ConnectionPool::findInMap(const uint160& hanko)
{
boost::mutex::scoped_lock sl(peerLock);
std::map<uint160, Peer::pointer>::iterator it=peerMap.find(hanko);
if(it==peerMap.end()) return Peer::pointer();
return it->second;
}
bool ConnectionPool::isMessageKnown(PackedMessage::pointer msg)
{
for(unsigned int n=0; n<mBroadcastMessages.size(); n++)

View File

@@ -1,33 +1,57 @@
#ifndef __CONNECTION_POOL__
#define __CONNECTION_POOL__
#include <boost/asio.hpp>
#include <boost/thread/mutex.hpp>
#include "Peer.h"
#include "PackedMessage.h"
#include "types.h"
/*
This is the list of all the Peers we are currently connected to
*/
//
// Access to the Newcoin network.
//
class ConnectionPool
{
boost::mutex peerLock;
std::vector<Peer::pointer> mPeers; // FIXME
std::map<uint160, Peer::pointer> peerMap;
private:
boost::mutex mPeerLock;
typedef std::pair<std::vector<unsigned char>, Peer::pointer> naPeer;
// Count of peers we are in progress of connecting to.
// We are in progress until we know their network public key.
int iConnecting;
// Peers we are connecting with and non-thin peers we are connected to.
boost::unordered_map<ipPort, Peer::pointer> mIpMap;
// Non-thin peers which we are connected to.
boost::unordered_map<std::vector<unsigned char>, Peer::pointer> mConnectedMap;
public:
ConnectionPool();
void connectToNetwork(boost::asio::io_service& io_service);
// Begin enforcing connection policy.
void start();
// Send message to network.
void relayMessage(Peer* fromPeer, PackedMessage::pointer msg);
// Manual connection request.
// Queue for immediate scanning.
bool connectTo(const std::string& host, const std::string& port);
bool connectTo(const std::string& strIp, int iPort);
// Peer notification routines.
void peerDisconnected(NewcoinAddress naPeer);
//
// Peer connectivity notification.
//
// Inbound connection, false=reject
bool peerAccepted(Peer::pointer peer, const std::string& strIp, int iPort);
// We know peers node public key.
void peerConnected(Peer::pointer peer);
// No longer connected.
void peerDisconnected(Peer::pointer peer);
Json::Value getPeersJson();
@@ -35,14 +59,6 @@ public:
//std::vector<std::pair<PackedMessage::pointer,int> > mBroadcastMessages;
bool isMessageKnown(PackedMessage::pointer msg);
// hanko->peer mapping functions
bool addToMap(const uint160& hanko, Peer::pointer peer);
bool delFromMap(const uint160& hanko);
bool inMap(const uint160& hanko);
std::map<uint160, Peer::pointer> getAllConnected();
Peer::pointer findInMap(const uint160& hanko);
#endif
};

View File

@@ -16,7 +16,7 @@ NewcoinAddress::NewcoinAddress()
nVersion = VER_NONE;
}
bool NewcoinAddress::IsValid() const
bool NewcoinAddress::isValid() const
{
return !vchData.empty();
}

View File

@@ -27,7 +27,7 @@ private:
public:
NewcoinAddress();
bool IsValid() const;
bool isValid() const;
void clear();
//

View File

@@ -1,11 +1,11 @@
#include <iostream>
#include <boost/foreach.hpp>
//#include <boost/log/trivial.hpp>
#include <boost/bind.hpp>
#include <boost/foreach.hpp>
#include <boost/make_shared.hpp>
#include <boost/ref.hpp>
//#include <boost/log/trivial.hpp>
#include "../json/writer.h"
@@ -16,7 +16,9 @@
#include "SerializedTransaction.h"
Peer::Peer(boost::asio::io_service& io_service)
: mSocket(io_service)
: mSocket(io_service),
mCtx(boost::asio::ssl::context::sslv23),
mSocketSsl(io_service, mCtx)
{
}
@@ -53,23 +55,101 @@ void Peer::detach()
mSendQ.clear();
mSocket.close();
// XXX Insufficient need to ip and port.
if(mPublicKey.IsValid()) theApp->getConnectionPool().peerDisconnected(mPublicKey);
theApp->getConnectionPool().peerDisconnected(shared_from_this());
}
void Peer::connected(const boost::system::error_code& error)
// Begin trying to connect. We are not connected till we know and accept peer's public key.
// Only takes IP addresses (not domains).
void Peer::connect(const std::string strIp, int iPort)
{
if(!error)
{
std::cout << "Connected to Peer." << std::endl; //BOOST_LOG_TRIVIAL(info) << "Connected to Peer.";
int iPortAct = iPort < 0 ? SYSTEM_PEER_PORT : iPort;
sendHello();
start_read_header();
boost::asio::ip::tcp::resolver::query query(strIp, boost::lexical_cast<std::string>(iPortAct),
boost::asio::ip::resolver_query_base::numeric_host|boost::asio::ip::resolver_query_base::numeric_service);
boost::asio::ip::tcp::resolver resolver(theApp->getIOService());
boost::system::error_code err;
boost::asio::ip::tcp::resolver::iterator itrEndpoint = resolver.resolve(query, err);
if (err || itrEndpoint == boost::asio::ip::tcp::resolver::iterator())
{
// Failed to resolve ip.
detach();
}
else
{
mIpPort = make_pair(strIp, iPort);
#if 1
boost::asio::async_connect(
mSocket,
itrEndpoint,
boost::bind(
&Peer::handleConnect,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::iterator));
#else
// Connect via ssl.
// XXX Why doesn't handler need an iterator?
boost::asio::async_connect(
mSocketSsl.lowest_layer(),
itrEndpoint,
boost::bind(
&Peer::handleConnect,
shared_from_this(),
boost::asio::placeholders::error));
#endif
}
}
// SSL connection.
void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it)
{
if (error)
{
std::cout << "Socket Connect failed:" << error << std::endl;
detach();
std::cout << "Peer::connected Error: " << error << std::endl; //else BOOST_LOG_TRIVIAL(info) << "Error: " << error;
}
else
{
std::cout << "Socket Connected." << std::endl;
// XXX Exchange public keys.
sendHello();
start_read_header();
}
}
// Peer connected via door.
void Peer::connected(const boost::system::error_code& error)
{
boost::asio::ip::tcp::endpoint ep = mSocket.remote_endpoint();
int iPort = ep.port();
std::string strIp = ep.address().to_string();
if (iPort == SYSTEM_PEER_PORT)
iPort = -1;
std::cout << "Remote peer: accept: " << strIp << " " << iPort << std::endl;
if (error)
{
std::cout << "Remote peer: accept error: " << error << std::endl;
detach();
}
else if (!theApp->getConnectionPool().peerAccepted(shared_from_this(), strIp, iPort))
{
std::cout << "Remote peer: rejecting." << std::endl;
// XXX Reject with a rejection message: already connected
detach();
}
else
{
std::cout << "Remote peer: accepted." << std::endl;
//BOOST_LOG_TRIVIAL(info) << "Connected to Peer.";
// Not redundant, add to connection list.
// XXX Exchange public keys.
sendHello();
start_read_header();
}
}

View File

@@ -2,9 +2,10 @@
#define __PEER__
#include <bitset>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include "../obj/src/newcoin.pb.h"
#include "PackedMessage.h"
@@ -19,19 +20,30 @@ enum PeerPunish
PP_UNWANTED_DATA=3, // The peer sent us data we didn't want/need
};
typedef std::pair<std::string,int> ipPort;
class Peer : public boost::enable_shared_from_this<Peer>
{
public:
static const int psbGotHello=0, psbSentHello=1, psbInMap=2, psbTrusted=3;
static const int psbNoLedgers=4, psbNoTransactions=5, psbDownLevel=6;
protected:
NewcoinAddress mPublicKey; // Public key of peer.
ipPort mIpPort;
void handleConnect(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it);
private:
boost::asio::ip::tcp::socket mSocket;
boost::asio::ssl::context mCtx;
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> mSocketSsl;
protected:
std::vector<uint8_t> mReadbuf;
std::list<PackedMessage::pointer> mSendQ;
PackedMessage::pointer mSendingPacket;
std::bitset<32> mPeerBits;
NewcoinAddress mPublicKey; // Public key of peer.
Peer(boost::asio::io_service& io_service);
@@ -81,6 +93,7 @@ public:
return mSocket;
}
void connect(const std::string strIp, int iPort);
void connected(const boost::system::error_code& error);
void detach();
bool samePeer(Peer::pointer p) { return samePeer(*p); }

View File

@@ -205,12 +205,12 @@ Json::Value RPCServer::doCreateFamily(Json::Value& params)
family=theApp->getWallet().addFamily(query, false);
}
if(!family.IsValid())
if(!family.isValid())
return JSONRPCError(500, "Invalid family specifier");
Json::Value ret(theApp->getWallet().getFamilyJson(family));
if(ret.isNull()) return JSONRPCError(500, "Invalid family");
if(seed.IsValid())
if(seed.isValid())
{
ret["FamilySeed"]=seed.humanFamilySeed();
}
@@ -255,7 +255,7 @@ Json::Value RPCServer::doNewAccount(Json::Value &params)
return JSONRPCError(500, "Family required");
NewcoinAddress family = parseFamily(fParam);
if(!family.IsValid()) return JSONRPCError(500, "Family not found.");
if(!family.isValid()) return JSONRPCError(500, "Family not found.");
LocalAccount::pointer account(theApp->getWallet().getNewLocalAccount(family));
if(!account)
@@ -272,7 +272,7 @@ Json::Value RPCServer::doLock(Json::Value &params)
if(extractString(fParam, params, 0))
{ // local <family>
NewcoinAddress family = parseFamily(fParam);
if(!family.IsValid()) return JSONRPCError(500, "Family not found");
if(!family.isValid()) return JSONRPCError(500, "Family not found");
theApp->getWallet().lock(family);
}
@@ -304,7 +304,7 @@ Json::Value RPCServer::doUnlock(Json::Value &params)
// pass phrase
family=theApp->getWallet().addFamily(param, false);
if(!family.IsValid())
if(!family.isValid())
return JSONRPCError(500, "Bad family");
Json::Value ret(theApp->getWallet().getFamilyJson(family));
@@ -339,7 +339,7 @@ Json::Value RPCServer::doFamilyInfo(Json::Value &params)
extractString(fParam, params, 0);
NewcoinAddress family=parseFamily(fParam);
if(!family.IsValid()) return JSONRPCError(500, "No such family");
if(!family.isValid()) return JSONRPCError(500, "No such family");
Json::Value obj(theApp->getWallet().getFamilyJson(family));
if(obj.isNull())
@@ -352,7 +352,7 @@ Json::Value RPCServer::doFamilyInfo(Json::Value &params)
int kn=boost::lexical_cast<int>(keyNum);
NewcoinAddress k=theApp->getWallet().peekKey(family, kn);
if(k.IsValid())
if(k.isValid())
{
Json::Value key(Json::objectValue);
key["Number"]=kn;
@@ -367,14 +367,30 @@ Json::Value RPCServer::doFamilyInfo(Json::Value &params)
Json::Value RPCServer::doConnect(Json::Value& params)
{
// connect <ip> [port]
std::string host, port;
std::string strIp;
int iPort = -1;
if(!extractString(host, params, 0))
return JSONRPCError(500, "Host required");
if(!extractString(port, params, 1))
port="6561";
if(!theApp->getConnectionPool().connectTo(host, port))
if(!params.isArray() || !params.size() || params.size() > 2)
return JSONRPCError(500, "Invalid parameters");
// XXX Might allow domain for manual connections.
if(!extractString(strIp, params, 0))
return JSONRPCError(500, "Host IP required");
if(params.size() == 2)
{
std::string strPort;
// YYY Should make an extract int.
if (!extractString(strPort, params, 1))
return JSONRPCError(500, "Bad port");
iPort = boost::lexical_cast<int>(strPort);
}
if(!theApp->getConnectionPool().connectTo(strIp, iPort))
return JSONRPCError(500, "Unable to connect");
return "connecting";
}
@@ -400,7 +416,7 @@ Json::Value RPCServer::doSendTo(Json::Value& params)
return JSONRPCError(500, "Invalid parameters");
NewcoinAddress destAccount = parseAccount(sDest);
if(!destAccount.IsValid())
if(!destAccount.isValid())
return JSONRPCError(500, "Unable to parse destination account");
uint64 iAmount;
@@ -713,7 +729,7 @@ Json::Value RPCServer::doCommand(const std::string& command, Json::Value& params
{
theApp->stop();
return "newcoin server stopping";
return SYSTEM_NAME " server stopping";
}
if(command=="unl_add") return doUnlAdd(params);

View File

@@ -1117,7 +1117,7 @@ void UniqueNodeList::setSeedDomains(const seedDomain& sdSource, bool bNext)
std::string strSql = str(boost::format("REPLACE INTO SeedDomains (Domain,PublicKey,Source,Next,Scan,Fetch,Sha256,Comment) VALUES (%s, %s, %s, %d, %d, %d, '%s', %s);")
% db->escape(sdSource.strDomain)
% (sdSource.naPublicKey.IsValid() ? db->escape(sdSource.naPublicKey.humanNodePublic()) : "NULL")
% (sdSource.naPublicKey.isValid() ? db->escape(sdSource.naPublicKey.humanNodePublic()) : "NULL")
% db->escape(std::string(1, static_cast<char>(sdSource.vsSource)))
% iNext
% iScan

View File

@@ -6,15 +6,13 @@
#include "../json/value.h"
#include "NewcoinAddress.h"
#include "Config.h"
#include "HttpsClient.h"
#include "ParseSection.h"
#include <boost/thread/mutex.hpp>
#include <boost/unordered_map.hpp>
#define SYSTEM_NAME "newcoin"
// Guarantees minimum thoughput of 1 node per second.
#define NODE_FETCH_JOBS 10
#define NODE_FETCH_SECONDS 10

View File

@@ -561,7 +561,7 @@ LocalAccount::pointer Wallet::parseAccount(const std::string& specifier)
; // nothing
}
return familyFound.IsValid()
return familyFound.isValid()
? getLocalAccount(familyFound, boost::lexical_cast<int>(seq))
: LocalAccount::pointer();
}

View File

@@ -225,3 +225,4 @@ message TMErrorMsg {
optional int32 errorCode = 1;
optional string message = 2;
}
// vim:ts=4