More clean up of connection logic.

This commit is contained in:
Arthur Britto
2012-06-20 15:52:59 -07:00
parent 89d8b74547
commit efa38ea72b
3 changed files with 134 additions and 107 deletions

View File

@@ -24,6 +24,7 @@
Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) :
mConnected(false),
mDetaching(false),
mSocketSsl(io_service, ctx),
mVerifyTimer(io_service)
{
@@ -32,8 +33,6 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx)
void Peer::handle_write(const boost::system::error_code& error, size_t bytes_transferred)
{
if (error)
Log(lsINFO) << "Peer: Write: Error: " << ADDRESS(this) << ": bytes=" << bytes_transferred << ": " << error.category().name() << ": " << error.message() << ": " << error;
#ifdef DEBUG
// if (!error)
// std::cerr << "Peer::handle_write bytes: "<< bytes_transferred << std::endl;
@@ -43,11 +42,14 @@ void Peer::handle_write(const boost::system::error_code& error, size_t bytes_tra
if (error)
{
detach("hw");
return;
}
if (!mDetaching)
{
Log(lsINFO) << "Peer: Write: Error: " << ADDRESS(this) << ": bytes=" << bytes_transferred << ": " << error.category().name() << ": " << error.message() << ": " << error;
if (!mSendQ.empty())
detach("hw");
}
} else if (!mSendQ.empty())
{
PackedMessage::pointer packet = mSendQ.front();
if(packet)
@@ -69,43 +71,49 @@ void Peer::setIpPort(const std::string& strIP, int iPort)
void Peer::detach(const char *rsn)
{
Log(lsDEBUG) << "Peer: Detach: "
<< ADDRESS(this) << "> "
<< rsn << ": "
<< (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort();
boost::system::error_code ecCancel;
(void) mVerifyTimer.cancel();
mSendQ.clear();
// We may close more than once.
boost::system::error_code ecShutdown;
getSocket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ecShutdown);
getSocket().close();
if (mNodePublic.isValid())
if (!mDetaching)
{
theApp->getConnectionPool().peerDisconnected(shared_from_this(), mNodePublic);
mDetaching = true; // Race is ok.
mNodePublic.clear(); // Be idompotent.
Log(lsDEBUG) << "Peer: Detach: "
<< ADDRESS(this) << "> "
<< rsn << ": "
<< (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort();
boost::system::error_code ecCancel;
(void) mVerifyTimer.cancel();
mSendQ.clear();
// We may close more than once.
boost::system::error_code ecShutdown;
getSocket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ecShutdown);
getSocket().close();
if (mNodePublic.isValid())
{
theApp->getConnectionPool().peerDisconnected(shared_from_this(), mNodePublic);
mNodePublic.clear(); // Be idompotent.
}
if (!mIpPort.first.empty())
{
// Connection might be part of scanning. Inform connect failed.
// Might need to scan. Inform connection closed.
theApp->getConnectionPool().peerClosed(shared_from_this(), mIpPort.first, mIpPort.second);
mIpPort.first.clear(); // Be idompotent.
}
Log(lsDEBUG) << "Peer: Detach: "
<< ADDRESS(this) << "< "
<< rsn << ": "
<< (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort();
}
if (!mIpPort.first.empty())
{
// Connection might be part of scanning. Inform connect failed.
// Might need to scan. Inform connection closed.
theApp->getConnectionPool().peerClosed(shared_from_this(), mIpPort.first, mIpPort.second);
mIpPort.first.clear(); // Be idompotent.
}
Log(lsDEBUG) << "Peer: Detach: "
<< ADDRESS(this) << "< "
<< rsn << ": "
<< (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort();
}
void Peer::handleVerifyTimer(const boost::system::error_code& ecResult)
@@ -235,12 +243,7 @@ void Peer::connected(const boost::system::error_code& error)
if (iPort == SYSTEM_PEER_PORT) //TODO: Why are you doing this?
iPort = -1;
if (error)
{
Log(lsINFO) << "Peer: Inbound: Error: " << ADDRESS(this) << ": " << strIp << " " << iPort << " : " << error.category().name() << ": " << error.message() << ": " << error;
detach("ctd");
}
else
if (!error)
{
// Not redundant ip and port, handshake, and start.
@@ -251,6 +254,12 @@ void Peer::connected(const boost::system::error_code& error)
mSocketSsl.async_handshake(boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::server,
boost::bind(&Peer::handleStart, shared_from_this(), boost::asio::placeholders::error));
}
else if (!mDetaching)
{
Log(lsINFO) << "Peer: Inbound: Error: " << ADDRESS(this) << ": " << strIp << " " << iPort << " : " << error.category().name() << ": " << error.message() << ": " << error;
detach("ctd");
}
}
void Peer::sendPacketForce(PackedMessage::pointer packet)
@@ -309,7 +318,7 @@ void Peer::handle_read_header(const boost::system::error_code& error)
}
start_read_body(msg_len);
}
else
else if (!mDetaching)
{
Log(lsINFO) << "Peer: Header: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error;
detach("hrh2");
@@ -323,7 +332,7 @@ void Peer::handle_read_body(const boost::system::error_code& error)
processReadBuffer();
start_read_header();
}
else
else if (!mDetaching)
{
Log(lsINFO) << "Peer: Body: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error;
detach("hrb");
@@ -342,7 +351,7 @@ void Peer::processReadBuffer()
// If connected and get a mtHELLO or if not connected and get a non-mtHELLO, wrong message was sent.
if (mConnected == (type == newcoin::mtHELLO))
{
std::cerr << "Wrong message type: " << type << std::endl;
Log(lsWARNING) << "Wrong message type: " << type;
detach("prb1");
}
else
@@ -542,6 +551,9 @@ void Peer::recvHello(newcoin::TMHello& packet)
#endif
bool bDetach = true;
// Cancel verification timeout.
(void) mVerifyTimer.cancel();
if (!mNodePublic.setNodePublic(packet.nodepublic()))
{
Log(lsINFO) << "Recv(Hello): Disconnect: Bad node public key.";
@@ -550,50 +562,53 @@ void Peer::recvHello(newcoin::TMHello& packet)
{ // Unable to verify they have private key for claimed public key.
Log(lsINFO) << "Recv(Hello): Disconnect: Failed to verify session.";
}
else if (!theApp->getConnectionPool().peerConnected(shared_from_this(), mNodePublic, getIP(), getPort()))
{ // Already connected, self, or some other reason.
Log(lsINFO) << "Recv(Hello): Disconnect: Extraneous connection.";
}
else
{ // Successful connection.
Log(lsINFO) << "Recv(Hello): Connect: " << mNodePublic.humanNodePublic();
// Cancel verification timeout.
(void) mVerifyTimer.cancel();
if (mClientConnect)
{
// If we connected due to scan, no longer need to scan.
theApp->getConnectionPool().peerVerified(shared_from_this());
}
// No longer connecting as client.
mClientConnect = false;
if (!theApp->getConnectionPool().peerConnected(shared_from_this(), mNodePublic, getIP(), getPort()))
{ // Already connected, self, or some other reason.
Log(lsINFO) << "Recv(Hello): Disconnect: Extraneous connection.";
}
else
{
// Take a guess at remotes address.
std::string strIP = getSocket().remote_endpoint().address().to_string();
int iPort = packet.ipv4port();
if (mClientConnect)
{
// No longer connecting as client.
mClientConnect = false;
}
else
{
// Take a guess at remotes address.
std::string strIP = getSocket().remote_endpoint().address().to_string();
int iPort = packet.ipv4port();
theApp->getConnectionPool().savePeer(strIP, iPort, UniqueNodeList::vsInbound);
theApp->getConnectionPool().savePeer(strIP, iPort, UniqueNodeList::vsInbound);
}
// Consider us connected. No longer accepting mtHELLO.
mConnected = true;
// XXX Set timer: connection is in grace period to be useful.
// XXX Set timer: connection idle (idle may vary depending on connection type.)
if ((packet.has_closedledger()) && (packet.closedledger().size() == (256 / 8)))
{
memcpy(mClosedLedgerHash.begin(), packet.closedledger().data(), 256 / 8);
if ((packet.has_previousledger()) && (packet.previousledger().size() == (256 / 8)))
memcpy(mPreviousLedgerHash.begin(), packet.previousledger().data(), 256 / 8);
else mPreviousLedgerHash.zero();
mClosedLedgerTime = boost::posix_time::second_clock::universal_time();
}
bDetach = false;
}
// Consider us connected. No longer accepting mtHELLO.
mConnected = true;
// XXX Set timer: connection is in grace period to be useful.
// XXX Set timer: connection idle (idle may vary depending on connection type.)
if ((packet.has_closedledger()) && (packet.closedledger().size() == (256 / 8)))
{
memcpy(mClosedLedgerHash.begin(), packet.closedledger().data(), 256 / 8);
if ((packet.has_previousledger()) && (packet.previousledger().size() == (256 / 8)))
memcpy(mPreviousLedgerHash.begin(), packet.previousledger().data(), 256 / 8);
else mPreviousLedgerHash.zero();
mClosedLedgerTime = boost::posix_time::second_clock::universal_time();
}
bDetach = false;
}
if (bDetach)