mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-04 11:15:56 +00:00
Compare commits
2 Commits
415a412d42
...
tapanito/f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a15abd4067 | ||
|
|
d9695be838 |
@@ -26,6 +26,7 @@ enum MessageType {
|
||||
mtREPLAY_DELTA_RESPONSE = 60;
|
||||
mtHAVE_TRANSACTIONS = 63;
|
||||
mtTRANSACTIONS = 64;
|
||||
mtCLOSE = 65;
|
||||
}
|
||||
|
||||
// token, iterations, target, challenge = issue demand for proof of work
|
||||
@@ -341,3 +342,19 @@ message TMReplayDeltaResponse {
|
||||
message TMHaveTransactions {
|
||||
repeated bytes hashes = 1;
|
||||
}
|
||||
|
||||
enum TMCloseReason {
|
||||
crRESOURCE = 1;
|
||||
crINVALID_CLOSED_LEDGER = 2;
|
||||
crINVALID_PREV_LEDGER = 3;
|
||||
crBAD_LEDGER_HEADERS = 4;
|
||||
crLARGE_SEND_QUEUE = 5;
|
||||
crNOT_USEFUL = 6;
|
||||
crPING_TIMEOUT = 7;
|
||||
crINTERNAL = 8;
|
||||
crSHUTDOWN = 9;
|
||||
}
|
||||
|
||||
message TMClose {
|
||||
required TMCloseReason reason = 1;
|
||||
}
|
||||
|
||||
@@ -64,6 +64,34 @@ std::chrono::seconds constexpr peerTimerInterval{60};
|
||||
// TODO: Remove this exclusion once unit tests are added after the hotfix
|
||||
// release.
|
||||
|
||||
std::string
|
||||
to_string(protocol::TMCloseReason reason)
|
||||
{
|
||||
switch (reason)
|
||||
{
|
||||
case protocol::crRESOURCE:
|
||||
return "Too Many P2P Messages";
|
||||
case protocol::crINVALID_CLOSED_LEDGER:
|
||||
return "Invalid Closed Ledger Header";
|
||||
case protocol::crINVALID_PREV_LEDGER:
|
||||
return "Invalid Previous Ledger Header";
|
||||
case protocol::crBAD_LEDGER_HEADERS:
|
||||
return "Bad Ledger Headers";
|
||||
case protocol::crLARGE_SEND_QUEUE:
|
||||
return "Large Send Queue";
|
||||
case protocol::crNOT_USEFUL:
|
||||
return "Peer Not Useful";
|
||||
case protocol::crPING_TIMEOUT:
|
||||
return "Ping Timeout";
|
||||
case protocol::crINTERNAL:
|
||||
return "Internal Error";
|
||||
case protocol::crSHUTDOWN:
|
||||
return "Shutdown";
|
||||
}
|
||||
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
PeerImp::PeerImp(
|
||||
Application& app,
|
||||
id_t id,
|
||||
@@ -178,7 +206,7 @@ PeerImp::run()
|
||||
closed = parseLedgerHash(iter->value());
|
||||
|
||||
if (!closed)
|
||||
fail("Malformed handshake data (1)");
|
||||
fail(protocol::TMCloseReason::crINVALID_CLOSED_LEDGER);
|
||||
}
|
||||
|
||||
if (auto const iter = headers_.find("Previous-Ledger");
|
||||
@@ -187,11 +215,11 @@ PeerImp::run()
|
||||
previous = parseLedgerHash(iter->value());
|
||||
|
||||
if (!previous)
|
||||
fail("Malformed handshake data (2)");
|
||||
fail(protocol::TMCloseReason::crINVALID_PREV_LEDGER);
|
||||
}
|
||||
|
||||
if (previous && !closed)
|
||||
fail("Malformed handshake data (3)");
|
||||
fail(protocol::TMCloseReason::crBAD_LEDGER_HEADERS);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> sl(recentLock_);
|
||||
@@ -231,7 +259,8 @@ PeerImp::stop()
|
||||
JLOG(journal_.info()) << "Stop";
|
||||
}
|
||||
}
|
||||
close();
|
||||
|
||||
sendAndClose(protocol::TMCloseReason::crSHUTDOWN);
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@@ -241,10 +270,6 @@ PeerImp::send(std::shared_ptr<Message> const& m)
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
|
||||
if (gracefulClose_)
|
||||
return;
|
||||
if (detaching_)
|
||||
return;
|
||||
|
||||
auto validator = m->getValidatorKey();
|
||||
if (validator && !squelch_.expireSquelch(*validator))
|
||||
@@ -356,7 +381,7 @@ PeerImp::charge(Resource::Charge const& fee, std::string const& context)
|
||||
{
|
||||
// Sever the connection
|
||||
overlay_.incPeerDisconnectCharges();
|
||||
fail("charge: Resources");
|
||||
fail(protocol::TMCloseReason::crRESOURCE);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -580,7 +605,6 @@ PeerImp::close()
|
||||
"ripple::PeerImp::close : strand in this thread");
|
||||
if (socket_.is_open())
|
||||
{
|
||||
detaching_ = true; // DEPRECATED
|
||||
try
|
||||
{
|
||||
timer_.cancel();
|
||||
@@ -604,22 +628,25 @@ PeerImp::close()
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::fail(std::string const& reason)
|
||||
PeerImp::fail(protocol::TMCloseReason reason)
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
return post(
|
||||
strand_,
|
||||
std::bind(
|
||||
(void(Peer::*)(std::string const&)) & PeerImp::fail,
|
||||
(void(Peer::*)(protocol::TMCloseReason)) & PeerImp::fail,
|
||||
shared_from_this(),
|
||||
reason));
|
||||
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
|
||||
|
||||
if (journal_.active(beast::severities::kWarning) && socket_.is_open() &&
|
||||
reason != protocol::TMCloseReason::crINTERNAL)
|
||||
{
|
||||
std::string const n = name();
|
||||
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
|
||||
<< " failed: " << reason;
|
||||
<< " failed: " << to_string(reason);
|
||||
}
|
||||
close();
|
||||
|
||||
sendAndClose(reason);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -634,28 +661,28 @@ PeerImp::fail(std::string const& name, error_code ec)
|
||||
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
|
||||
<< " at " << remote_address_.to_string() << ": " << ec.message();
|
||||
}
|
||||
close();
|
||||
|
||||
sendAndClose(protocol::TMCloseReason::crINTERNAL);
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::gracefulClose()
|
||||
PeerImp::sendAndClose(protocol::TMCloseReason reason)
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::PeerImp::gracefulClose : strand in this thread");
|
||||
XRPL_ASSERT(
|
||||
socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open");
|
||||
XRPL_ASSERT(
|
||||
!gracefulClose_,
|
||||
"ripple::PeerImp::gracefulClose : socket is not closing");
|
||||
gracefulClose_ = true;
|
||||
if (send_queue_.size() > 0)
|
||||
if (shutdown_)
|
||||
return;
|
||||
setTimer();
|
||||
stream_.async_shutdown(bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
|
||||
|
||||
// erase all outstanding messages except for the one
|
||||
// currently being executed
|
||||
if (send_queue_.size() > 1)
|
||||
{
|
||||
decltype(send_queue_) q({send_queue_.front()});
|
||||
send_queue_.swap(q);
|
||||
}
|
||||
|
||||
shutdown_ = true;
|
||||
protocol::TMClose tmGC;
|
||||
tmGC.set_reason(reason);
|
||||
send(std::make_shared<Message>(tmGC, protocol::mtCLOSE));
|
||||
}
|
||||
|
||||
void
|
||||
@@ -713,14 +740,11 @@ PeerImp::onTimer(error_code const& ec)
|
||||
{
|
||||
// This should never happen
|
||||
JLOG(journal_.error()) << "onTimer: " << ec.message();
|
||||
return close();
|
||||
return fail(protocol::TMCloseReason::crINTERNAL);
|
||||
}
|
||||
|
||||
if (large_sendq_++ >= Tuning::sendqIntervals)
|
||||
{
|
||||
fail("Large send queue");
|
||||
return;
|
||||
}
|
||||
return fail(protocol::TMCloseReason::crLARGE_SEND_QUEUE);
|
||||
|
||||
if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged)
|
||||
{
|
||||
@@ -737,17 +761,13 @@ PeerImp::onTimer(error_code const& ec)
|
||||
(duration > app_.config().MAX_UNKNOWN_TIME)))
|
||||
{
|
||||
overlay_.peerFinder().on_failure(slot_);
|
||||
fail("Not useful");
|
||||
return;
|
||||
return fail(protocol::TMCloseReason::crNOT_USEFUL);
|
||||
}
|
||||
}
|
||||
|
||||
// Already waiting for PONG
|
||||
if (lastPingSeq_)
|
||||
{
|
||||
fail("Ping Timeout");
|
||||
return;
|
||||
}
|
||||
return fail(protocol::TMCloseReason::crPING_TIMEOUT);
|
||||
|
||||
lastPingTime_ = clock_type::now();
|
||||
lastPingSeq_ = rand_int<std::uint32_t>();
|
||||
@@ -761,21 +781,6 @@ PeerImp::onTimer(error_code const& ec)
|
||||
setTimer();
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::onShutdown(error_code ec)
|
||||
{
|
||||
cancelTimer();
|
||||
// If we don't get eof then something went wrong
|
||||
if (!ec)
|
||||
{
|
||||
JLOG(journal_.error()) << "onShutdown: expected error condition";
|
||||
return close();
|
||||
}
|
||||
if (ec != boost::asio::error::eof)
|
||||
return fail("onShutdown", ec);
|
||||
close();
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
void
|
||||
PeerImp::doAccept()
|
||||
@@ -791,7 +796,10 @@ PeerImp::doAccept()
|
||||
// This shouldn't fail since we already computed
|
||||
// the shared value successfully in OverlayImpl
|
||||
if (!sharedValue)
|
||||
return fail("makeSharedValue: Unexpected failure");
|
||||
{
|
||||
JLOG(journal_.error()) << "doAccept: makeSharedValue failed";
|
||||
return fail(protocol::TMCloseReason::crINTERNAL);
|
||||
}
|
||||
|
||||
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
|
||||
JLOG(journal_.info()) << "Public Key: "
|
||||
@@ -841,7 +849,9 @@ PeerImp::doAccept()
|
||||
return fail("onWriteResponse", ec);
|
||||
if (write_buffer->size() == bytes_transferred)
|
||||
return doProtocolStart();
|
||||
return fail("Failed to write header");
|
||||
|
||||
JLOG(journal_.error()) << "Failed to write header";
|
||||
return fail(protocol::TMCloseReason::crINTERNAL);
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -905,15 +915,19 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
|
||||
// we started closing the local connection, stop reading
|
||||
if (ec == boost::asio::error::operation_aborted || shutdown_)
|
||||
return;
|
||||
if (ec == boost::asio::error::eof)
|
||||
{
|
||||
// Peer initiated connection close, just clean up
|
||||
JLOG(journal_.info()) << "EOF";
|
||||
return gracefulClose();
|
||||
return close();
|
||||
}
|
||||
if (ec)
|
||||
return fail("onReadMessage", ec);
|
||||
|
||||
if (auto stream = journal_.trace())
|
||||
{
|
||||
if (bytes_transferred > 0)
|
||||
@@ -945,8 +959,6 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
|
||||
return fail("onReadMessage", ec);
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
if (gracefulClose_)
|
||||
return;
|
||||
if (bytes_consumed == 0)
|
||||
break;
|
||||
read_buffer_.consume(bytes_consumed);
|
||||
@@ -969,6 +981,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
if (ec)
|
||||
@@ -1002,16 +1015,9 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
||||
std::placeholders::_1,
|
||||
std::placeholders::_2)));
|
||||
}
|
||||
|
||||
if (gracefulClose_)
|
||||
{
|
||||
return stream_.async_shutdown(bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onShutdown,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1)));
|
||||
}
|
||||
// If the send queue is empty and we are shutting down, close the connection
|
||||
else if (shutdown_)
|
||||
close();
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@@ -2746,6 +2752,20 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
|
||||
<< "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMClose> const& m)
|
||||
{
|
||||
if (m->has_reason())
|
||||
{
|
||||
JLOG(p_journal_.debug())
|
||||
<< "onMessage: TMClose: peer closed the connection: "
|
||||
<< to_string(m->reason());
|
||||
}
|
||||
|
||||
// do not send a close message when the peer initiates the shutdown
|
||||
close();
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
void
|
||||
|
||||
@@ -95,7 +95,7 @@ private:
|
||||
|
||||
std::atomic<Tracking> tracking_;
|
||||
clock_type::time_point trackingTime_;
|
||||
bool detaching_ = false;
|
||||
bool shutdown_ = false;
|
||||
// Node public key of peer.
|
||||
PublicKey const publicKey_;
|
||||
std::string name_;
|
||||
@@ -175,7 +175,6 @@ private:
|
||||
http_response_type response_;
|
||||
boost::beast::http::fields const& headers_;
|
||||
std::queue<std::shared_ptr<Message>> send_queue_;
|
||||
bool gracefulClose_ = false;
|
||||
int large_sendq_ = 0;
|
||||
std::unique_ptr<LoadEvent> load_event_;
|
||||
// The highest sequence of each PublisherList that has
|
||||
@@ -426,7 +425,7 @@ public:
|
||||
isHighLatency() const override;
|
||||
|
||||
void
|
||||
fail(std::string const& reason);
|
||||
fail(protocol::TMCloseReason reason);
|
||||
|
||||
bool
|
||||
compressionEnabled() const override
|
||||
@@ -445,10 +444,10 @@ private:
|
||||
close();
|
||||
|
||||
void
|
||||
fail(std::string const& name, error_code ec);
|
||||
sendAndClose(protocol::TMCloseReason reason);
|
||||
|
||||
void
|
||||
gracefulClose();
|
||||
fail(std::string const& name, error_code ec);
|
||||
|
||||
void
|
||||
setTimer();
|
||||
@@ -463,10 +462,6 @@ private:
|
||||
void
|
||||
onTimer(boost::system::error_code const& ec);
|
||||
|
||||
// Called when SSL shutdown completes
|
||||
void
|
||||
onShutdown(error_code ec);
|
||||
|
||||
void
|
||||
doAccept();
|
||||
|
||||
@@ -584,6 +579,8 @@ public:
|
||||
onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMClose> const& m);
|
||||
|
||||
private:
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
@@ -104,8 +104,8 @@ protocolMessageName(int type)
|
||||
return "replay_delta_request";
|
||||
case protocol::mtREPLAY_DELTA_RESPONSE:
|
||||
return "replay_delta_response";
|
||||
default:
|
||||
break;
|
||||
case protocol::mtCLOSE:
|
||||
return "close";
|
||||
}
|
||||
return "unknown";
|
||||
}
|
||||
@@ -470,6 +470,10 @@ invokeProtocolMessage(
|
||||
success = detail::invoke<protocol::TMReplayDeltaResponse>(
|
||||
*header, buffers, handler);
|
||||
break;
|
||||
case protocol::mtCLOSE:
|
||||
success =
|
||||
detail::invoke<protocol::TMClose>(*header, buffers, handler);
|
||||
break;
|
||||
default:
|
||||
handler.onMessageUnknown(header->message_type);
|
||||
success = true;
|
||||
|
||||
@@ -46,7 +46,7 @@ std::unordered_map<protocol::MessageType, TrafficCount::category> const
|
||||
{protocol::mtTRANSACTIONS,
|
||||
TrafficCount::category::requested_transactions},
|
||||
{protocol::mtSQUELCH, TrafficCount::category::squelch},
|
||||
};
|
||||
{protocol::mtCLOSE, TrafficCount::category::close}};
|
||||
|
||||
TrafficCount::category
|
||||
TrafficCount::categorize(
|
||||
|
||||
@@ -195,7 +195,7 @@ public:
|
||||
|
||||
// The total p2p bytes sent and received on the wire
|
||||
total,
|
||||
|
||||
close,
|
||||
unknown // must be last
|
||||
};
|
||||
|
||||
@@ -304,6 +304,7 @@ public:
|
||||
{replay_delta_response, "replay_delta_response"},
|
||||
{have_transactions, "have_transactions"},
|
||||
{requested_transactions, "requested_transactions"},
|
||||
{close, "close"},
|
||||
{total, "total"}};
|
||||
|
||||
if (auto it = category_map.find(cat); it != category_map.end())
|
||||
@@ -370,6 +371,7 @@ protected:
|
||||
{have_transactions, {have_transactions}},
|
||||
{requested_transactions, {requested_transactions}},
|
||||
{total, {total}},
|
||||
{close, {close}},
|
||||
{unknown, {unknown}},
|
||||
};
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user