diff --git a/include/xrpl/proto/ripple.proto b/include/xrpl/proto/ripple.proto index f93ebbc72c..6758693ec7 100644 --- a/include/xrpl/proto/ripple.proto +++ b/include/xrpl/proto/ripple.proto @@ -26,6 +26,7 @@ enum MessageType { mtREPLAY_DELTA_RESPONSE = 60; mtHAVE_TRANSACTIONS = 63; mtTRANSACTIONS = 64; + mtCLOSE = 65; } // token, iterations, target, challenge = issue demand for proof of work @@ -341,3 +342,19 @@ message TMReplayDeltaResponse { message TMHaveTransactions { repeated bytes hashes = 1; } + +enum TMCloseReason { + crRESOURCE = 1; + crINVALID_CLOSED_LEDGER = 2; + crINVALID_PREV_LEDGER = 3; + crBAD_LEDGER_HEADERS = 4; + crLARGE_SEND_QUEUE = 5; + crNOT_USEFUL = 6; + crPING_TIMEOUT = 7; + crINTERNAL = 8; + crSHUTDOWN = 9; +} + +message TMClose { + required TMCloseReason reason = 1; +} \ No newline at end of file diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 69f25e1eb4..e059cf46db 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -64,6 +64,34 @@ std::chrono::seconds constexpr peerTimerInterval{60}; // TODO: Remove this exclusion once unit tests are added after the hotfix // release. +std::string +to_string(protocol::TMCloseReason reason) +{ + switch (reason) + { + case protocol::crRESOURCE: + return "Too Many P2P Messages"; + case protocol::crINVALID_CLOSED_LEDGER: + return "Invalid Closed Ledger Header"; + case protocol::crINVALID_PREV_LEDGER: + return "Invalid Previous Ledger Header"; + case protocol::crBAD_LEDGER_HEADERS: + return "Bad Ledger Headers"; + case protocol::crLARGE_SEND_QUEUE: + return "Large Send Queue"; + case protocol::crNOT_USEFUL: + return "Peer Not Useful"; + case protocol::crPING_TIMEOUT: + return "Ping Timeout"; + case protocol::crINTERNAL: + return "Internal Error"; + case protocol::crSHUTDOWN: + return "Shutdown"; + } + + return "unknown"; +} + PeerImp::PeerImp( Application& app, id_t id, @@ -178,7 +206,7 @@ PeerImp::run() closed = parseLedgerHash(iter->value()); if (!closed) - fail("Malformed handshake data (1)"); + fail(protocol::TMCloseReason::crINVALID_CLOSED_LEDGER); } if (auto const iter = headers_.find("Previous-Ledger"); @@ -187,11 +215,11 @@ PeerImp::run() previous = parseLedgerHash(iter->value()); if (!previous) - fail("Malformed handshake data (2)"); + fail(protocol::TMCloseReason::crINVALID_PREV_LEDGER); } if (previous && !closed) - fail("Malformed handshake data (3)"); + fail(protocol::TMCloseReason::crBAD_LEDGER_HEADERS); { std::lock_guard sl(recentLock_); @@ -231,7 +259,8 @@ PeerImp::stop() JLOG(journal_.info()) << "Stop"; } } - close(); + + sendAndClose(protocol::TMCloseReason::crSHUTDOWN); } //------------------------------------------------------------------------------ @@ -241,10 +270,6 @@ PeerImp::send(std::shared_ptr const& m) { if (!strand_.running_in_this_thread()) return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m)); - if (gracefulClose_) - return; - if (detaching_) - return; auto validator = m->getValidatorKey(); if (validator && !squelch_.expireSquelch(*validator)) @@ -356,7 +381,7 @@ PeerImp::charge(Resource::Charge const& fee, std::string const& context) { // Sever the connection overlay_.incPeerDisconnectCharges(); - fail("charge: Resources"); + fail(protocol::TMCloseReason::crRESOURCE); } } @@ -580,7 +605,6 @@ PeerImp::close() "ripple::PeerImp::close : strand in this thread"); if (socket_.is_open()) { - detaching_ = true; // DEPRECATED try { timer_.cancel(); @@ -604,22 +628,25 @@ PeerImp::close() } void -PeerImp::fail(std::string const& reason) +PeerImp::fail(protocol::TMCloseReason reason) { if (!strand_.running_in_this_thread()) return post( strand_, std::bind( - (void(Peer::*)(std::string const&)) & PeerImp::fail, + (void(Peer::*)(protocol::TMCloseReason)) & PeerImp::fail, shared_from_this(), reason)); - if (journal_.active(beast::severities::kWarning) && socket_.is_open()) + + if (journal_.active(beast::severities::kWarning) && socket_.is_open() && + reason != protocol::TMCloseReason::crINTERNAL) { std::string const n = name(); JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n) - << " failed: " << reason; + << " failed: " << to_string(reason); } - close(); + + sendAndClose(reason); } void @@ -634,28 +661,28 @@ PeerImp::fail(std::string const& name, error_code ec) << name << " from " << toBase58(TokenType::NodePublic, publicKey_) << " at " << remote_address_.to_string() << ": " << ec.message(); } - close(); + + sendAndClose(protocol::TMCloseReason::crINTERNAL); } void -PeerImp::gracefulClose() +PeerImp::sendAndClose(protocol::TMCloseReason reason) { - XRPL_ASSERT( - strand_.running_in_this_thread(), - "ripple::PeerImp::gracefulClose : strand in this thread"); - XRPL_ASSERT( - socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open"); - XRPL_ASSERT( - !gracefulClose_, - "ripple::PeerImp::gracefulClose : socket is not closing"); - gracefulClose_ = true; - if (send_queue_.size() > 0) + if (shutdown_) return; - setTimer(); - stream_.async_shutdown(bind_executor( - strand_, - std::bind( - &PeerImp::onShutdown, shared_from_this(), std::placeholders::_1))); + + // erase all outstanding messages except for the one + // currently being executed + if (send_queue_.size() > 1) + { + decltype(send_queue_) q({send_queue_.front()}); + send_queue_.swap(q); + } + + shutdown_ = true; + protocol::TMClose tmGC; + tmGC.set_reason(reason); + send(std::make_shared(tmGC, protocol::mtCLOSE)); } void @@ -713,14 +740,11 @@ PeerImp::onTimer(error_code const& ec) { // This should never happen JLOG(journal_.error()) << "onTimer: " << ec.message(); - return close(); + return fail(protocol::TMCloseReason::crINTERNAL); } if (large_sendq_++ >= Tuning::sendqIntervals) - { - fail("Large send queue"); - return; - } + return fail(protocol::TMCloseReason::crLARGE_SEND_QUEUE); if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged) { @@ -737,17 +761,13 @@ PeerImp::onTimer(error_code const& ec) (duration > app_.config().MAX_UNKNOWN_TIME))) { overlay_.peerFinder().on_failure(slot_); - fail("Not useful"); - return; + return fail(protocol::TMCloseReason::crNOT_USEFUL); } } // Already waiting for PONG if (lastPingSeq_) - { - fail("Ping Timeout"); - return; - } + return fail(protocol::TMCloseReason::crPING_TIMEOUT); lastPingTime_ = clock_type::now(); lastPingSeq_ = rand_int(); @@ -761,21 +781,6 @@ PeerImp::onTimer(error_code const& ec) setTimer(); } -void -PeerImp::onShutdown(error_code ec) -{ - cancelTimer(); - // If we don't get eof then something went wrong - if (!ec) - { - JLOG(journal_.error()) << "onShutdown: expected error condition"; - return close(); - } - if (ec != boost::asio::error::eof) - return fail("onShutdown", ec); - close(); -} - //------------------------------------------------------------------------------ void PeerImp::doAccept() @@ -791,7 +796,10 @@ PeerImp::doAccept() // This shouldn't fail since we already computed // the shared value successfully in OverlayImpl if (!sharedValue) - return fail("makeSharedValue: Unexpected failure"); + { + JLOG(journal_.error()) << "doAccept: makeSharedValue failed"; + return fail(protocol::TMCloseReason::crINTERNAL); + } JLOG(journal_.info()) << "Protocol: " << to_string(protocol_); JLOG(journal_.info()) << "Public Key: " @@ -841,7 +849,9 @@ PeerImp::doAccept() return fail("onWriteResponse", ec); if (write_buffer->size() == bytes_transferred) return doProtocolStart(); - return fail("Failed to write header"); + + JLOG(journal_.error()) << "Failed to write header"; + return fail(protocol::TMCloseReason::crINTERNAL); })); } @@ -905,15 +915,19 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) { if (!socket_.is_open()) return; - if (ec == boost::asio::error::operation_aborted) + + // we started closing the local connection, stop reading + if (ec == boost::asio::error::operation_aborted || shutdown_) return; if (ec == boost::asio::error::eof) { + // Peer initiated connection close, just clean up JLOG(journal_.info()) << "EOF"; - return gracefulClose(); + return close(); } if (ec) return fail("onReadMessage", ec); + if (auto stream = journal_.trace()) { if (bytes_transferred > 0) @@ -945,8 +959,6 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) return fail("onReadMessage", ec); if (!socket_.is_open()) return; - if (gracefulClose_) - return; if (bytes_consumed == 0) break; read_buffer_.consume(bytes_consumed); @@ -969,6 +981,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) { if (!socket_.is_open()) return; + if (ec == boost::asio::error::operation_aborted) return; if (ec) @@ -1002,16 +1015,9 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) std::placeholders::_1, std::placeholders::_2))); } - - if (gracefulClose_) - { - return stream_.async_shutdown(bind_executor( - strand_, - std::bind( - &PeerImp::onShutdown, - shared_from_this(), - std::placeholders::_1))); - } + // If the send queue is empty and we are shutting down, close the connection + else if (shutdown_) + close(); } //------------------------------------------------------------------------------ @@ -2746,6 +2752,20 @@ PeerImp::onMessage(std::shared_ptr const& m) << "onMessage: TMSquelch " << slice << " " << id() << " " << duration; } +void +PeerImp::onMessage(std::shared_ptr const& m) +{ + if (m->has_reason()) + { + JLOG(p_journal_.debug()) + << "onMessage: TMClose: peer closed the connection: " + << to_string(m->reason()); + } + + // do not send a close message when the peer initiates the shutdown + close(); +} + //-------------------------------------------------------------------------- void diff --git a/src/xrpld/overlay/detail/PeerImp.h b/src/xrpld/overlay/detail/PeerImp.h index 3d9a0c0b1e..88aae1ea7b 100644 --- a/src/xrpld/overlay/detail/PeerImp.h +++ b/src/xrpld/overlay/detail/PeerImp.h @@ -95,7 +95,7 @@ private: std::atomic tracking_; clock_type::time_point trackingTime_; - bool detaching_ = false; + bool shutdown_ = false; // Node public key of peer. PublicKey const publicKey_; std::string name_; @@ -175,7 +175,6 @@ private: http_response_type response_; boost::beast::http::fields const& headers_; std::queue> send_queue_; - bool gracefulClose_ = false; int large_sendq_ = 0; std::unique_ptr load_event_; // The highest sequence of each PublisherList that has @@ -426,7 +425,7 @@ public: isHighLatency() const override; void - fail(std::string const& reason); + fail(protocol::TMCloseReason reason); bool compressionEnabled() const override @@ -445,10 +444,10 @@ private: close(); void - fail(std::string const& name, error_code ec); + sendAndClose(protocol::TMCloseReason reason); void - gracefulClose(); + fail(std::string const& name, error_code ec); void setTimer(); @@ -463,10 +462,6 @@ private: void onTimer(boost::system::error_code const& ec); - // Called when SSL shutdown completes - void - onShutdown(error_code ec); - void doAccept(); @@ -584,6 +579,8 @@ public: onMessage(std::shared_ptr const& m); void onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); private: //-------------------------------------------------------------------------- diff --git a/src/xrpld/overlay/detail/ProtocolMessage.h b/src/xrpld/overlay/detail/ProtocolMessage.h index 26c83055a3..16de06f2a3 100644 --- a/src/xrpld/overlay/detail/ProtocolMessage.h +++ b/src/xrpld/overlay/detail/ProtocolMessage.h @@ -104,8 +104,8 @@ protocolMessageName(int type) return "replay_delta_request"; case protocol::mtREPLAY_DELTA_RESPONSE: return "replay_delta_response"; - default: - break; + case protocol::mtCLOSE: + return "close"; } return "unknown"; } @@ -470,6 +470,10 @@ invokeProtocolMessage( success = detail::invoke( *header, buffers, handler); break; + case protocol::mtCLOSE: + success = + detail::invoke(*header, buffers, handler); + break; default: handler.onMessageUnknown(header->message_type); success = true; diff --git a/src/xrpld/overlay/detail/TrafficCount.cpp b/src/xrpld/overlay/detail/TrafficCount.cpp index a2234a432e..ecd67bc93c 100644 --- a/src/xrpld/overlay/detail/TrafficCount.cpp +++ b/src/xrpld/overlay/detail/TrafficCount.cpp @@ -46,7 +46,7 @@ std::unordered_map const {protocol::mtTRANSACTIONS, TrafficCount::category::requested_transactions}, {protocol::mtSQUELCH, TrafficCount::category::squelch}, -}; + {protocol::mtCLOSE, TrafficCount::category::close}}; TrafficCount::category TrafficCount::categorize( diff --git a/src/xrpld/overlay/detail/TrafficCount.h b/src/xrpld/overlay/detail/TrafficCount.h index 8dc02def5f..647abf576b 100644 --- a/src/xrpld/overlay/detail/TrafficCount.h +++ b/src/xrpld/overlay/detail/TrafficCount.h @@ -195,7 +195,7 @@ public: // The total p2p bytes sent and received on the wire total, - + close, unknown // must be last }; @@ -304,6 +304,7 @@ public: {replay_delta_response, "replay_delta_response"}, {have_transactions, "have_transactions"}, {requested_transactions, "requested_transactions"}, + {close, "close"}, {total, "total"}}; if (auto it = category_map.find(cat); it != category_map.end()) @@ -370,6 +371,7 @@ protected: {have_transactions, {have_transactions}}, {requested_transactions, {requested_transactions}}, {total, {total}}, + {close, {close}}, {unknown, {unknown}}, }; };