mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-20 19:15:54 +00:00
Compare commits
2 Commits
bthomee/re
...
tapanito/f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a15abd4067 | ||
|
|
d9695be838 |
@@ -26,6 +26,7 @@ enum MessageType {
|
|||||||
mtREPLAY_DELTA_RESPONSE = 60;
|
mtREPLAY_DELTA_RESPONSE = 60;
|
||||||
mtHAVE_TRANSACTIONS = 63;
|
mtHAVE_TRANSACTIONS = 63;
|
||||||
mtTRANSACTIONS = 64;
|
mtTRANSACTIONS = 64;
|
||||||
|
mtCLOSE = 65;
|
||||||
}
|
}
|
||||||
|
|
||||||
// token, iterations, target, challenge = issue demand for proof of work
|
// token, iterations, target, challenge = issue demand for proof of work
|
||||||
@@ -341,3 +342,19 @@ message TMReplayDeltaResponse {
|
|||||||
message TMHaveTransactions {
|
message TMHaveTransactions {
|
||||||
repeated bytes hashes = 1;
|
repeated bytes hashes = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum TMCloseReason {
|
||||||
|
crRESOURCE = 1;
|
||||||
|
crINVALID_CLOSED_LEDGER = 2;
|
||||||
|
crINVALID_PREV_LEDGER = 3;
|
||||||
|
crBAD_LEDGER_HEADERS = 4;
|
||||||
|
crLARGE_SEND_QUEUE = 5;
|
||||||
|
crNOT_USEFUL = 6;
|
||||||
|
crPING_TIMEOUT = 7;
|
||||||
|
crINTERNAL = 8;
|
||||||
|
crSHUTDOWN = 9;
|
||||||
|
}
|
||||||
|
|
||||||
|
message TMClose {
|
||||||
|
required TMCloseReason reason = 1;
|
||||||
|
}
|
||||||
|
|||||||
@@ -64,6 +64,34 @@ std::chrono::seconds constexpr peerTimerInterval{60};
|
|||||||
// TODO: Remove this exclusion once unit tests are added after the hotfix
|
// TODO: Remove this exclusion once unit tests are added after the hotfix
|
||||||
// release.
|
// release.
|
||||||
|
|
||||||
|
std::string
|
||||||
|
to_string(protocol::TMCloseReason reason)
|
||||||
|
{
|
||||||
|
switch (reason)
|
||||||
|
{
|
||||||
|
case protocol::crRESOURCE:
|
||||||
|
return "Too Many P2P Messages";
|
||||||
|
case protocol::crINVALID_CLOSED_LEDGER:
|
||||||
|
return "Invalid Closed Ledger Header";
|
||||||
|
case protocol::crINVALID_PREV_LEDGER:
|
||||||
|
return "Invalid Previous Ledger Header";
|
||||||
|
case protocol::crBAD_LEDGER_HEADERS:
|
||||||
|
return "Bad Ledger Headers";
|
||||||
|
case protocol::crLARGE_SEND_QUEUE:
|
||||||
|
return "Large Send Queue";
|
||||||
|
case protocol::crNOT_USEFUL:
|
||||||
|
return "Peer Not Useful";
|
||||||
|
case protocol::crPING_TIMEOUT:
|
||||||
|
return "Ping Timeout";
|
||||||
|
case protocol::crINTERNAL:
|
||||||
|
return "Internal Error";
|
||||||
|
case protocol::crSHUTDOWN:
|
||||||
|
return "Shutdown";
|
||||||
|
}
|
||||||
|
|
||||||
|
return "unknown";
|
||||||
|
}
|
||||||
|
|
||||||
PeerImp::PeerImp(
|
PeerImp::PeerImp(
|
||||||
Application& app,
|
Application& app,
|
||||||
id_t id,
|
id_t id,
|
||||||
@@ -178,7 +206,7 @@ PeerImp::run()
|
|||||||
closed = parseLedgerHash(iter->value());
|
closed = parseLedgerHash(iter->value());
|
||||||
|
|
||||||
if (!closed)
|
if (!closed)
|
||||||
fail("Malformed handshake data (1)");
|
fail(protocol::TMCloseReason::crINVALID_CLOSED_LEDGER);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (auto const iter = headers_.find("Previous-Ledger");
|
if (auto const iter = headers_.find("Previous-Ledger");
|
||||||
@@ -187,11 +215,11 @@ PeerImp::run()
|
|||||||
previous = parseLedgerHash(iter->value());
|
previous = parseLedgerHash(iter->value());
|
||||||
|
|
||||||
if (!previous)
|
if (!previous)
|
||||||
fail("Malformed handshake data (2)");
|
fail(protocol::TMCloseReason::crINVALID_PREV_LEDGER);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (previous && !closed)
|
if (previous && !closed)
|
||||||
fail("Malformed handshake data (3)");
|
fail(protocol::TMCloseReason::crBAD_LEDGER_HEADERS);
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> sl(recentLock_);
|
std::lock_guard<std::mutex> sl(recentLock_);
|
||||||
@@ -231,7 +259,8 @@ PeerImp::stop()
|
|||||||
JLOG(journal_.info()) << "Stop";
|
JLOG(journal_.info()) << "Stop";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
close();
|
|
||||||
|
sendAndClose(protocol::TMCloseReason::crSHUTDOWN);
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
@@ -241,10 +270,6 @@ PeerImp::send(std::shared_ptr<Message> const& m)
|
|||||||
{
|
{
|
||||||
if (!strand_.running_in_this_thread())
|
if (!strand_.running_in_this_thread())
|
||||||
return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
|
return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
|
||||||
if (gracefulClose_)
|
|
||||||
return;
|
|
||||||
if (detaching_)
|
|
||||||
return;
|
|
||||||
|
|
||||||
auto validator = m->getValidatorKey();
|
auto validator = m->getValidatorKey();
|
||||||
if (validator && !squelch_.expireSquelch(*validator))
|
if (validator && !squelch_.expireSquelch(*validator))
|
||||||
@@ -356,7 +381,7 @@ PeerImp::charge(Resource::Charge const& fee, std::string const& context)
|
|||||||
{
|
{
|
||||||
// Sever the connection
|
// Sever the connection
|
||||||
overlay_.incPeerDisconnectCharges();
|
overlay_.incPeerDisconnectCharges();
|
||||||
fail("charge: Resources");
|
fail(protocol::TMCloseReason::crRESOURCE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -580,7 +605,6 @@ PeerImp::close()
|
|||||||
"ripple::PeerImp::close : strand in this thread");
|
"ripple::PeerImp::close : strand in this thread");
|
||||||
if (socket_.is_open())
|
if (socket_.is_open())
|
||||||
{
|
{
|
||||||
detaching_ = true; // DEPRECATED
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
timer_.cancel();
|
timer_.cancel();
|
||||||
@@ -604,22 +628,25 @@ PeerImp::close()
|
|||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
PeerImp::fail(std::string const& reason)
|
PeerImp::fail(protocol::TMCloseReason reason)
|
||||||
{
|
{
|
||||||
if (!strand_.running_in_this_thread())
|
if (!strand_.running_in_this_thread())
|
||||||
return post(
|
return post(
|
||||||
strand_,
|
strand_,
|
||||||
std::bind(
|
std::bind(
|
||||||
(void(Peer::*)(std::string const&)) & PeerImp::fail,
|
(void(Peer::*)(protocol::TMCloseReason)) & PeerImp::fail,
|
||||||
shared_from_this(),
|
shared_from_this(),
|
||||||
reason));
|
reason));
|
||||||
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
|
|
||||||
|
if (journal_.active(beast::severities::kWarning) && socket_.is_open() &&
|
||||||
|
reason != protocol::TMCloseReason::crINTERNAL)
|
||||||
{
|
{
|
||||||
std::string const n = name();
|
std::string const n = name();
|
||||||
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
|
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
|
||||||
<< " failed: " << reason;
|
<< " failed: " << to_string(reason);
|
||||||
}
|
}
|
||||||
close();
|
|
||||||
|
sendAndClose(reason);
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@@ -634,28 +661,28 @@ PeerImp::fail(std::string const& name, error_code ec)
|
|||||||
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
|
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
|
||||||
<< " at " << remote_address_.to_string() << ": " << ec.message();
|
<< " at " << remote_address_.to_string() << ": " << ec.message();
|
||||||
}
|
}
|
||||||
close();
|
|
||||||
|
sendAndClose(protocol::TMCloseReason::crINTERNAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
PeerImp::gracefulClose()
|
PeerImp::sendAndClose(protocol::TMCloseReason reason)
|
||||||
{
|
{
|
||||||
XRPL_ASSERT(
|
if (shutdown_)
|
||||||
strand_.running_in_this_thread(),
|
|
||||||
"ripple::PeerImp::gracefulClose : strand in this thread");
|
|
||||||
XRPL_ASSERT(
|
|
||||||
socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open");
|
|
||||||
XRPL_ASSERT(
|
|
||||||
!gracefulClose_,
|
|
||||||
"ripple::PeerImp::gracefulClose : socket is not closing");
|
|
||||||
gracefulClose_ = true;
|
|
||||||
if (send_queue_.size() > 0)
|
|
||||||
return;
|
return;
|
||||||
setTimer();
|
|
||||||
stream_.async_shutdown(bind_executor(
|
// erase all outstanding messages except for the one
|
||||||
strand_,
|
// currently being executed
|
||||||
std::bind(
|
if (send_queue_.size() > 1)
|
||||||
&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
|
{
|
||||||
|
decltype(send_queue_) q({send_queue_.front()});
|
||||||
|
send_queue_.swap(q);
|
||||||
|
}
|
||||||
|
|
||||||
|
shutdown_ = true;
|
||||||
|
protocol::TMClose tmGC;
|
||||||
|
tmGC.set_reason(reason);
|
||||||
|
send(std::make_shared<Message>(tmGC, protocol::mtCLOSE));
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@@ -713,14 +740,11 @@ PeerImp::onTimer(error_code const& ec)
|
|||||||
{
|
{
|
||||||
// This should never happen
|
// This should never happen
|
||||||
JLOG(journal_.error()) << "onTimer: " << ec.message();
|
JLOG(journal_.error()) << "onTimer: " << ec.message();
|
||||||
return close();
|
return fail(protocol::TMCloseReason::crINTERNAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (large_sendq_++ >= Tuning::sendqIntervals)
|
if (large_sendq_++ >= Tuning::sendqIntervals)
|
||||||
{
|
return fail(protocol::TMCloseReason::crLARGE_SEND_QUEUE);
|
||||||
fail("Large send queue");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged)
|
if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged)
|
||||||
{
|
{
|
||||||
@@ -737,17 +761,13 @@ PeerImp::onTimer(error_code const& ec)
|
|||||||
(duration > app_.config().MAX_UNKNOWN_TIME)))
|
(duration > app_.config().MAX_UNKNOWN_TIME)))
|
||||||
{
|
{
|
||||||
overlay_.peerFinder().on_failure(slot_);
|
overlay_.peerFinder().on_failure(slot_);
|
||||||
fail("Not useful");
|
return fail(protocol::TMCloseReason::crNOT_USEFUL);
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Already waiting for PONG
|
// Already waiting for PONG
|
||||||
if (lastPingSeq_)
|
if (lastPingSeq_)
|
||||||
{
|
return fail(protocol::TMCloseReason::crPING_TIMEOUT);
|
||||||
fail("Ping Timeout");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
lastPingTime_ = clock_type::now();
|
lastPingTime_ = clock_type::now();
|
||||||
lastPingSeq_ = rand_int<std::uint32_t>();
|
lastPingSeq_ = rand_int<std::uint32_t>();
|
||||||
@@ -761,21 +781,6 @@ PeerImp::onTimer(error_code const& ec)
|
|||||||
setTimer();
|
setTimer();
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
|
||||||
PeerImp::onShutdown(error_code ec)
|
|
||||||
{
|
|
||||||
cancelTimer();
|
|
||||||
// If we don't get eof then something went wrong
|
|
||||||
if (!ec)
|
|
||||||
{
|
|
||||||
JLOG(journal_.error()) << "onShutdown: expected error condition";
|
|
||||||
return close();
|
|
||||||
}
|
|
||||||
if (ec != boost::asio::error::eof)
|
|
||||||
return fail("onShutdown", ec);
|
|
||||||
close();
|
|
||||||
}
|
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
void
|
void
|
||||||
PeerImp::doAccept()
|
PeerImp::doAccept()
|
||||||
@@ -791,7 +796,10 @@ PeerImp::doAccept()
|
|||||||
// This shouldn't fail since we already computed
|
// This shouldn't fail since we already computed
|
||||||
// the shared value successfully in OverlayImpl
|
// the shared value successfully in OverlayImpl
|
||||||
if (!sharedValue)
|
if (!sharedValue)
|
||||||
return fail("makeSharedValue: Unexpected failure");
|
{
|
||||||
|
JLOG(journal_.error()) << "doAccept: makeSharedValue failed";
|
||||||
|
return fail(protocol::TMCloseReason::crINTERNAL);
|
||||||
|
}
|
||||||
|
|
||||||
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
|
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
|
||||||
JLOG(journal_.info()) << "Public Key: "
|
JLOG(journal_.info()) << "Public Key: "
|
||||||
@@ -841,7 +849,9 @@ PeerImp::doAccept()
|
|||||||
return fail("onWriteResponse", ec);
|
return fail("onWriteResponse", ec);
|
||||||
if (write_buffer->size() == bytes_transferred)
|
if (write_buffer->size() == bytes_transferred)
|
||||||
return doProtocolStart();
|
return doProtocolStart();
|
||||||
return fail("Failed to write header");
|
|
||||||
|
JLOG(journal_.error()) << "Failed to write header";
|
||||||
|
return fail(protocol::TMCloseReason::crINTERNAL);
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -905,15 +915,19 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
|
|||||||
{
|
{
|
||||||
if (!socket_.is_open())
|
if (!socket_.is_open())
|
||||||
return;
|
return;
|
||||||
if (ec == boost::asio::error::operation_aborted)
|
|
||||||
|
// we started closing the local connection, stop reading
|
||||||
|
if (ec == boost::asio::error::operation_aborted || shutdown_)
|
||||||
return;
|
return;
|
||||||
if (ec == boost::asio::error::eof)
|
if (ec == boost::asio::error::eof)
|
||||||
{
|
{
|
||||||
|
// Peer initiated connection close, just clean up
|
||||||
JLOG(journal_.info()) << "EOF";
|
JLOG(journal_.info()) << "EOF";
|
||||||
return gracefulClose();
|
return close();
|
||||||
}
|
}
|
||||||
if (ec)
|
if (ec)
|
||||||
return fail("onReadMessage", ec);
|
return fail("onReadMessage", ec);
|
||||||
|
|
||||||
if (auto stream = journal_.trace())
|
if (auto stream = journal_.trace())
|
||||||
{
|
{
|
||||||
if (bytes_transferred > 0)
|
if (bytes_transferred > 0)
|
||||||
@@ -945,8 +959,6 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
|
|||||||
return fail("onReadMessage", ec);
|
return fail("onReadMessage", ec);
|
||||||
if (!socket_.is_open())
|
if (!socket_.is_open())
|
||||||
return;
|
return;
|
||||||
if (gracefulClose_)
|
|
||||||
return;
|
|
||||||
if (bytes_consumed == 0)
|
if (bytes_consumed == 0)
|
||||||
break;
|
break;
|
||||||
read_buffer_.consume(bytes_consumed);
|
read_buffer_.consume(bytes_consumed);
|
||||||
@@ -969,6 +981,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
|||||||
{
|
{
|
||||||
if (!socket_.is_open())
|
if (!socket_.is_open())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (ec == boost::asio::error::operation_aborted)
|
if (ec == boost::asio::error::operation_aborted)
|
||||||
return;
|
return;
|
||||||
if (ec)
|
if (ec)
|
||||||
@@ -1002,16 +1015,9 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
|||||||
std::placeholders::_1,
|
std::placeholders::_1,
|
||||||
std::placeholders::_2)));
|
std::placeholders::_2)));
|
||||||
}
|
}
|
||||||
|
// If the send queue is empty and we are shutting down, close the connection
|
||||||
if (gracefulClose_)
|
else if (shutdown_)
|
||||||
{
|
close();
|
||||||
return stream_.async_shutdown(bind_executor(
|
|
||||||
strand_,
|
|
||||||
std::bind(
|
|
||||||
&PeerImp::onShutdown,
|
|
||||||
shared_from_this(),
|
|
||||||
std::placeholders::_1)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
@@ -2746,6 +2752,20 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
|
|||||||
<< "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
|
<< "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
PeerImp::onMessage(std::shared_ptr<protocol::TMClose> const& m)
|
||||||
|
{
|
||||||
|
if (m->has_reason())
|
||||||
|
{
|
||||||
|
JLOG(p_journal_.debug())
|
||||||
|
<< "onMessage: TMClose: peer closed the connection: "
|
||||||
|
<< to_string(m->reason());
|
||||||
|
}
|
||||||
|
|
||||||
|
// do not send a close message when the peer initiates the shutdown
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
|
|
||||||
void
|
void
|
||||||
|
|||||||
@@ -95,7 +95,7 @@ private:
|
|||||||
|
|
||||||
std::atomic<Tracking> tracking_;
|
std::atomic<Tracking> tracking_;
|
||||||
clock_type::time_point trackingTime_;
|
clock_type::time_point trackingTime_;
|
||||||
bool detaching_ = false;
|
bool shutdown_ = false;
|
||||||
// Node public key of peer.
|
// Node public key of peer.
|
||||||
PublicKey const publicKey_;
|
PublicKey const publicKey_;
|
||||||
std::string name_;
|
std::string name_;
|
||||||
@@ -175,7 +175,6 @@ private:
|
|||||||
http_response_type response_;
|
http_response_type response_;
|
||||||
boost::beast::http::fields const& headers_;
|
boost::beast::http::fields const& headers_;
|
||||||
std::queue<std::shared_ptr<Message>> send_queue_;
|
std::queue<std::shared_ptr<Message>> send_queue_;
|
||||||
bool gracefulClose_ = false;
|
|
||||||
int large_sendq_ = 0;
|
int large_sendq_ = 0;
|
||||||
std::unique_ptr<LoadEvent> load_event_;
|
std::unique_ptr<LoadEvent> load_event_;
|
||||||
// The highest sequence of each PublisherList that has
|
// The highest sequence of each PublisherList that has
|
||||||
@@ -426,7 +425,7 @@ public:
|
|||||||
isHighLatency() const override;
|
isHighLatency() const override;
|
||||||
|
|
||||||
void
|
void
|
||||||
fail(std::string const& reason);
|
fail(protocol::TMCloseReason reason);
|
||||||
|
|
||||||
bool
|
bool
|
||||||
compressionEnabled() const override
|
compressionEnabled() const override
|
||||||
@@ -445,10 +444,10 @@ private:
|
|||||||
close();
|
close();
|
||||||
|
|
||||||
void
|
void
|
||||||
fail(std::string const& name, error_code ec);
|
sendAndClose(protocol::TMCloseReason reason);
|
||||||
|
|
||||||
void
|
void
|
||||||
gracefulClose();
|
fail(std::string const& name, error_code ec);
|
||||||
|
|
||||||
void
|
void
|
||||||
setTimer();
|
setTimer();
|
||||||
@@ -463,10 +462,6 @@ private:
|
|||||||
void
|
void
|
||||||
onTimer(boost::system::error_code const& ec);
|
onTimer(boost::system::error_code const& ec);
|
||||||
|
|
||||||
// Called when SSL shutdown completes
|
|
||||||
void
|
|
||||||
onShutdown(error_code ec);
|
|
||||||
|
|
||||||
void
|
void
|
||||||
doAccept();
|
doAccept();
|
||||||
|
|
||||||
@@ -584,6 +579,8 @@ public:
|
|||||||
onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m);
|
onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m);
|
||||||
void
|
void
|
||||||
onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m);
|
onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMClose> const& m);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -104,8 +104,8 @@ protocolMessageName(int type)
|
|||||||
return "replay_delta_request";
|
return "replay_delta_request";
|
||||||
case protocol::mtREPLAY_DELTA_RESPONSE:
|
case protocol::mtREPLAY_DELTA_RESPONSE:
|
||||||
return "replay_delta_response";
|
return "replay_delta_response";
|
||||||
default:
|
case protocol::mtCLOSE:
|
||||||
break;
|
return "close";
|
||||||
}
|
}
|
||||||
return "unknown";
|
return "unknown";
|
||||||
}
|
}
|
||||||
@@ -470,6 +470,10 @@ invokeProtocolMessage(
|
|||||||
success = detail::invoke<protocol::TMReplayDeltaResponse>(
|
success = detail::invoke<protocol::TMReplayDeltaResponse>(
|
||||||
*header, buffers, handler);
|
*header, buffers, handler);
|
||||||
break;
|
break;
|
||||||
|
case protocol::mtCLOSE:
|
||||||
|
success =
|
||||||
|
detail::invoke<protocol::TMClose>(*header, buffers, handler);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
handler.onMessageUnknown(header->message_type);
|
handler.onMessageUnknown(header->message_type);
|
||||||
success = true;
|
success = true;
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ std::unordered_map<protocol::MessageType, TrafficCount::category> const
|
|||||||
{protocol::mtTRANSACTIONS,
|
{protocol::mtTRANSACTIONS,
|
||||||
TrafficCount::category::requested_transactions},
|
TrafficCount::category::requested_transactions},
|
||||||
{protocol::mtSQUELCH, TrafficCount::category::squelch},
|
{protocol::mtSQUELCH, TrafficCount::category::squelch},
|
||||||
};
|
{protocol::mtCLOSE, TrafficCount::category::close}};
|
||||||
|
|
||||||
TrafficCount::category
|
TrafficCount::category
|
||||||
TrafficCount::categorize(
|
TrafficCount::categorize(
|
||||||
|
|||||||
@@ -195,7 +195,7 @@ public:
|
|||||||
|
|
||||||
// The total p2p bytes sent and received on the wire
|
// The total p2p bytes sent and received on the wire
|
||||||
total,
|
total,
|
||||||
|
close,
|
||||||
unknown // must be last
|
unknown // must be last
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -304,6 +304,7 @@ public:
|
|||||||
{replay_delta_response, "replay_delta_response"},
|
{replay_delta_response, "replay_delta_response"},
|
||||||
{have_transactions, "have_transactions"},
|
{have_transactions, "have_transactions"},
|
||||||
{requested_transactions, "requested_transactions"},
|
{requested_transactions, "requested_transactions"},
|
||||||
|
{close, "close"},
|
||||||
{total, "total"}};
|
{total, "total"}};
|
||||||
|
|
||||||
if (auto it = category_map.find(cat); it != category_map.end())
|
if (auto it = category_map.find(cat); it != category_map.end())
|
||||||
@@ -370,6 +371,7 @@ protected:
|
|||||||
{have_transactions, {have_transactions}},
|
{have_transactions, {have_transactions}},
|
||||||
{requested_transactions, {requested_transactions}},
|
{requested_transactions, {requested_transactions}},
|
||||||
{total, {total}},
|
{total, {total}},
|
||||||
|
{close, {close}},
|
||||||
{unknown, {unknown}},
|
{unknown, {unknown}},
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user