Clean up peer isConnected() and shutting down.

This commit is contained in:
Arthur Britto
2012-06-20 18:08:36 -07:00
parent b47f76a21f
commit afa128f45b
2 changed files with 60 additions and 41 deletions

View File

@@ -23,7 +23,7 @@
#define NODE_VERIFY_SECONDS 15
Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) :
mConnected(false),
mHelloed(false),
mDetaching(false),
mSocketSsl(io_service, ctx),
mVerifyTimer(io_service)
@@ -40,19 +40,22 @@ void Peer::handle_write(const boost::system::error_code& error, size_t bytes_tra
mSendingPacket = PackedMessage::pointer();
if (error)
if (mDetaching)
{
if (!mDetaching)
{
Log(lsINFO) << "Peer: Write: Error: " << ADDRESS(this) << ": bytes=" << bytes_transferred << ": " << error.category().name() << ": " << error.message() << ": " << error;
// Ignore write requests when detatching.
nothing();
}
else if (error)
{
Log(lsINFO) << "Peer: Write: Error: " << ADDRESS(this) << ": bytes=" << bytes_transferred << ": " << error.category().name() << ": " << error.message() << ": " << error;
detach("hw");
}
} else if (!mSendQ.empty())
detach("hw");
}
else if (!mSendQ.empty())
{
PackedMessage::pointer packet = mSendQ.front();
if(packet)
if (packet)
{
sendPacketForce(packet);
mSendQ.pop_front();
@@ -71,7 +74,6 @@ void Peer::setIpPort(const std::string& strIP, int iPort)
void Peer::detach(const char *rsn)
{
if (!mDetaching)
{
mDetaching = true; // Race is ok.
@@ -81,17 +83,10 @@ void Peer::detach(const char *rsn)
<< rsn << ": "
<< (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort();
boost::system::error_code ecCancel;
(void) mVerifyTimer.cancel();
mSendQ.clear();
// We may close more than once.
boost::system::error_code ecShutdown;
getSocket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ecShutdown);
getSocket().close();
(void) mVerifyTimer.cancel();
mSocketSsl.async_shutdown(boost::bind(&Peer::handleShutdown, shared_from_this(), boost::asio::placeholders::error));
if (mNodePublic.isValid())
{
@@ -264,11 +259,15 @@ void Peer::connected(const boost::system::error_code& error)
void Peer::sendPacketForce(PackedMessage::pointer packet)
{
mSendingPacket = packet;
boost::asio::async_write(mSocketSsl, boost::asio::buffer(packet->getBuffer()),
boost::bind(&Peer::handle_write, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
if (!mDetaching)
{
mSendingPacket = packet;
boost::asio::async_write(mSocketSsl, boost::asio::buffer(packet->getBuffer()),
boost::bind(&Peer::handle_write, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
void Peer::sendPacket(PackedMessage::pointer packet)
@@ -288,10 +287,14 @@ void Peer::sendPacket(PackedMessage::pointer packet)
void Peer::start_read_header()
{
mReadbuf.clear();
mReadbuf.resize(HEADER_SIZE);
boost::asio::async_read(mSocketSsl, boost::asio::buffer(mReadbuf),
boost::bind(&Peer::handle_read_header, shared_from_this(), boost::asio::placeholders::error));
if (!mDetaching)
{
mReadbuf.clear();
mReadbuf.resize(HEADER_SIZE);
boost::asio::async_read(mSocketSsl, boost::asio::buffer(mReadbuf),
boost::bind(&Peer::handle_read_header, shared_from_this(), boost::asio::placeholders::error));
}
}
void Peer::start_read_body(unsigned msg_len)
@@ -299,15 +302,24 @@ void Peer::start_read_body(unsigned msg_len)
// m_readbuf already contains the header in its first HEADER_SIZE
// bytes. Expand it to fit in the body as well, and start async
// read into the body.
//
mReadbuf.resize(HEADER_SIZE + msg_len);
boost::asio::async_read(mSocketSsl, boost::asio::buffer(&mReadbuf[HEADER_SIZE], msg_len),
boost::bind(&Peer::handle_read_body, shared_from_this(), boost::asio::placeholders::error));
if (!mDetaching)
{
mReadbuf.resize(HEADER_SIZE + msg_len);
boost::asio::async_read(mSocketSsl, boost::asio::buffer(&mReadbuf[HEADER_SIZE], msg_len),
boost::bind(&Peer::handle_read_body, shared_from_this(), boost::asio::placeholders::error));
}
}
void Peer::handle_read_header(const boost::system::error_code& error)
{
if (!error)
if (mDetaching)
{
// Drop data or error if detaching.
nothing();
}
else if (!error)
{
unsigned msg_len = PackedMessage::getLength(mReadbuf);
// WRITEME: Compare to maximum message length, abort if too large
@@ -318,7 +330,7 @@ void Peer::handle_read_header(const boost::system::error_code& error)
}
start_read_body(msg_len);
}
else if (!mDetaching)
else
{
Log(lsINFO) << "Peer: Header: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error;
detach("hrh2");
@@ -327,12 +339,17 @@ void Peer::handle_read_header(const boost::system::error_code& error)
void Peer::handle_read_body(const boost::system::error_code& error)
{
if (!error)
if (mDetaching)
{
// Drop data or error if detaching.
nothing();
}
else if (!error)
{
processReadBuffer();
start_read_header();
}
else if (!mDetaching)
else
{
Log(lsINFO) << "Peer: Body: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error;
detach("hrb");
@@ -349,7 +366,7 @@ void Peer::processReadBuffer()
// std::cerr << "Peer::processReadBuffer: " << mIpPort.first << " " << mIpPort.second << std::endl;
// If connected and get a mtHELLO or if not connected and get a non-mtHELLO, wrong message was sent.
if (mConnected == (type == newcoin::mtHELLO))
if (mHelloed == (type == newcoin::mtHELLO))
{
Log(lsWARNING) << "Wrong message type: " << type;
detach("prb1");
@@ -593,7 +610,7 @@ void Peer::recvHello(newcoin::TMHello& packet)
}
// Consider us connected. No longer accepting mtHELLO.
mConnected = true;
mHelloed = true;
// XXX Set timer: connection is in grace period to be useful.
// XXX Set timer: connection idle (idle may vary depending on connection type.)

View File

@@ -31,7 +31,7 @@ public:
private:
bool mClientConnect; // In process of connecting as client.
bool mConnected; // True, if hello accepted.
bool mHelloed; // True, if hello accepted.
bool mDetaching; // True, if detaching.
NewcoinAddress mNodePublic; // Node public key of peer.
ipPort mIpPort;
@@ -58,6 +58,8 @@ protected:
Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx);
void handleShutdown(const boost::system::error_code& error) { ; }
void handle_write(const boost::system::error_code& error, size_t bytes_transferred);
void handle_read_header(const boost::system::error_code& error);
void handle_read_body(const boost::system::error_code& error);
@@ -128,7 +130,7 @@ public:
void punishPeer(PeerPunish pp);
Json::Value getJson();
bool isConnected() const { return mConnected; }
bool isConnected() const { return mHelloed && !mDetaching; }
//static PackedMessage::pointer createFullLedger(Ledger::pointer ledger);
static PackedMessage::pointer createLedgerProposal(Ledger::pointer ledger);