From afa128f45bce4e8e87a9d37e366754a4235aa96e Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Wed, 20 Jun 2012 18:08:36 -0700 Subject: [PATCH] Clean up peer isConnected() and shutting down. --- src/Peer.cpp | 95 +++++++++++++++++++++++++++++++--------------------- src/Peer.h | 6 ++-- 2 files changed, 60 insertions(+), 41 deletions(-) diff --git a/src/Peer.cpp b/src/Peer.cpp index c30f30e452..33c51dcc3b 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -23,7 +23,7 @@ #define NODE_VERIFY_SECONDS 15 Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) : - mConnected(false), + mHelloed(false), mDetaching(false), mSocketSsl(io_service, ctx), mVerifyTimer(io_service) @@ -40,19 +40,22 @@ void Peer::handle_write(const boost::system::error_code& error, size_t bytes_tra mSendingPacket = PackedMessage::pointer(); - if (error) + if (mDetaching) { - if (!mDetaching) - { - Log(lsINFO) << "Peer: Write: Error: " << ADDRESS(this) << ": bytes=" << bytes_transferred << ": " << error.category().name() << ": " << error.message() << ": " << error; + // Ignore write requests when detatching. + nothing(); + } + else if (error) + { + Log(lsINFO) << "Peer: Write: Error: " << ADDRESS(this) << ": bytes=" << bytes_transferred << ": " << error.category().name() << ": " << error.message() << ": " << error; - detach("hw"); - } - - } else if (!mSendQ.empty()) + detach("hw"); + } + else if (!mSendQ.empty()) { PackedMessage::pointer packet = mSendQ.front(); - if(packet) + + if (packet) { sendPacketForce(packet); mSendQ.pop_front(); @@ -71,7 +74,6 @@ void Peer::setIpPort(const std::string& strIP, int iPort) void Peer::detach(const char *rsn) { - if (!mDetaching) { mDetaching = true; // Race is ok. @@ -81,17 +83,10 @@ void Peer::detach(const char *rsn) << rsn << ": " << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); - boost::system::error_code ecCancel; - - (void) mVerifyTimer.cancel(); - mSendQ.clear(); - // We may close more than once. - boost::system::error_code ecShutdown; - getSocket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ecShutdown); - - getSocket().close(); + (void) mVerifyTimer.cancel(); + mSocketSsl.async_shutdown(boost::bind(&Peer::handleShutdown, shared_from_this(), boost::asio::placeholders::error)); if (mNodePublic.isValid()) { @@ -264,11 +259,15 @@ void Peer::connected(const boost::system::error_code& error) void Peer::sendPacketForce(PackedMessage::pointer packet) { - mSendingPacket = packet; - boost::asio::async_write(mSocketSsl, boost::asio::buffer(packet->getBuffer()), - boost::bind(&Peer::handle_write, shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + if (!mDetaching) + { + mSendingPacket = packet; + + boost::asio::async_write(mSocketSsl, boost::asio::buffer(packet->getBuffer()), + boost::bind(&Peer::handle_write, shared_from_this(), + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } } void Peer::sendPacket(PackedMessage::pointer packet) @@ -288,10 +287,14 @@ void Peer::sendPacket(PackedMessage::pointer packet) void Peer::start_read_header() { - mReadbuf.clear(); - mReadbuf.resize(HEADER_SIZE); - boost::asio::async_read(mSocketSsl, boost::asio::buffer(mReadbuf), - boost::bind(&Peer::handle_read_header, shared_from_this(), boost::asio::placeholders::error)); + if (!mDetaching) + { + mReadbuf.clear(); + mReadbuf.resize(HEADER_SIZE); + + boost::asio::async_read(mSocketSsl, boost::asio::buffer(mReadbuf), + boost::bind(&Peer::handle_read_header, shared_from_this(), boost::asio::placeholders::error)); + } } void Peer::start_read_body(unsigned msg_len) @@ -299,15 +302,24 @@ void Peer::start_read_body(unsigned msg_len) // m_readbuf already contains the header in its first HEADER_SIZE // bytes. Expand it to fit in the body as well, and start async // read into the body. - // - mReadbuf.resize(HEADER_SIZE + msg_len); - boost::asio::async_read(mSocketSsl, boost::asio::buffer(&mReadbuf[HEADER_SIZE], msg_len), - boost::bind(&Peer::handle_read_body, shared_from_this(), boost::asio::placeholders::error)); + + if (!mDetaching) + { + mReadbuf.resize(HEADER_SIZE + msg_len); + + boost::asio::async_read(mSocketSsl, boost::asio::buffer(&mReadbuf[HEADER_SIZE], msg_len), + boost::bind(&Peer::handle_read_body, shared_from_this(), boost::asio::placeholders::error)); + } } void Peer::handle_read_header(const boost::system::error_code& error) { - if (!error) + if (mDetaching) + { + // Drop data or error if detaching. + nothing(); + } + else if (!error) { unsigned msg_len = PackedMessage::getLength(mReadbuf); // WRITEME: Compare to maximum message length, abort if too large @@ -318,7 +330,7 @@ void Peer::handle_read_header(const boost::system::error_code& error) } start_read_body(msg_len); } - else if (!mDetaching) + else { Log(lsINFO) << "Peer: Header: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error; detach("hrh2"); @@ -327,12 +339,17 @@ void Peer::handle_read_header(const boost::system::error_code& error) void Peer::handle_read_body(const boost::system::error_code& error) { - if (!error) + if (mDetaching) + { + // Drop data or error if detaching. + nothing(); + } + else if (!error) { processReadBuffer(); start_read_header(); } - else if (!mDetaching) + else { Log(lsINFO) << "Peer: Body: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error; detach("hrb"); @@ -349,7 +366,7 @@ void Peer::processReadBuffer() // std::cerr << "Peer::processReadBuffer: " << mIpPort.first << " " << mIpPort.second << std::endl; // If connected and get a mtHELLO or if not connected and get a non-mtHELLO, wrong message was sent. - if (mConnected == (type == newcoin::mtHELLO)) + if (mHelloed == (type == newcoin::mtHELLO)) { Log(lsWARNING) << "Wrong message type: " << type; detach("prb1"); @@ -593,7 +610,7 @@ void Peer::recvHello(newcoin::TMHello& packet) } // Consider us connected. No longer accepting mtHELLO. - mConnected = true; + mHelloed = true; // XXX Set timer: connection is in grace period to be useful. // XXX Set timer: connection idle (idle may vary depending on connection type.) diff --git a/src/Peer.h b/src/Peer.h index f4ecc82d30..3150532efa 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -31,7 +31,7 @@ public: private: bool mClientConnect; // In process of connecting as client. - bool mConnected; // True, if hello accepted. + bool mHelloed; // True, if hello accepted. bool mDetaching; // True, if detaching. NewcoinAddress mNodePublic; // Node public key of peer. ipPort mIpPort; @@ -58,6 +58,8 @@ protected: Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx); + void handleShutdown(const boost::system::error_code& error) { ; } + void handle_write(const boost::system::error_code& error, size_t bytes_transferred); void handle_read_header(const boost::system::error_code& error); void handle_read_body(const boost::system::error_code& error); @@ -128,7 +130,7 @@ public: void punishPeer(PeerPunish pp); Json::Value getJson(); - bool isConnected() const { return mConnected; } + bool isConnected() const { return mHelloed && !mDetaching; } //static PackedMessage::pointer createFullLedger(Ledger::pointer ledger); static PackedMessage::pointer createLedgerProposal(Ledger::pointer ledger);