From 6dbbb7406c99b53684fdde5effc95b372aa1745b Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 19 Oct 2015 16:09:01 -0700 Subject: [PATCH] Allow subscription to peer status changes (RIPD-579) Subscribe to "peer_status" stream (admin only) permits reception of "peerStatusChange" notifications. These can include the event the peer is reporting, the peer's new status, the peer's currently accepted ledger hash and sequence, the peer's network time, and the range of ledgers the peer has available for remote querying. --- src/ripple/app/misc/NetworkOPs.cpp | 46 ++++++++++++++++ src/ripple/net/InfoSub.h | 4 ++ src/ripple/net/impl/InfoSub.cpp | 1 + src/ripple/overlay/impl/PeerImp.cpp | 72 +++++++++++++++++++++++++ src/ripple/rpc/handlers/Subscribe.cpp | 7 +++ src/ripple/rpc/handlers/Unsubscribe.cpp | 3 ++ 6 files changed, 133 insertions(+) diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index e0c7af004..72e68a74b 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -382,6 +382,10 @@ public: bool subValidations (InfoSub::ref ispListener) override; bool unsubValidations (std::uint64_t uListener) override; + bool subPeerStatus (InfoSub::ref ispListener) override; + bool unsubPeerStatus (std::uint64_t uListener) override; + void pubPeerStatus (std::function const&) override; + InfoSub::pointer findRpcSub (std::string const& strUrl) override; InfoSub::pointer addRpcSub ( std::string const& strUrl, InfoSub::ref) override; @@ -464,6 +468,7 @@ private: SubMapType mSubTransactions; // All accepted transactions. SubMapType mSubRTTransactions; // All proposed and accepted transactions. SubMapType mSubValidations; // Received validations. + SubMapType mSubPeerStatus; // peer status changes std::uint32_t mLastLoadBase; std::uint32_t mLastLoadFactor; @@ -1540,6 +1545,33 @@ void NetworkOPsImp::pubValidation (STValidation::ref val) } } +void NetworkOPsImp::pubPeerStatus ( + std::function const& func) +{ + ScopedLockType sl (mSubLock); + + if (!mSubPeerStatus.empty ()) + { + Json::Value jvObj (func()); + + jvObj [jss::type] = "peerStatusChange"; + + for (auto i = mSubPeerStatus.begin (); i != mSubPeerStatus.end (); ) + { + InfoSub::pointer p = i->second.lock (); + + if (p) + { + p->send (jvObj, true); + ++i; + } + else + { + i = mSubValidations.erase (i); + } + } + } +} void NetworkOPsImp::setMode (OperatingMode om) { @@ -2540,6 +2572,20 @@ bool NetworkOPsImp::unsubValidations (std::uint64_t uSeq) return mSubValidations.erase (uSeq); } +// <-- bool: true=added, false=already there +bool NetworkOPsImp::subPeerStatus (InfoSub::ref isrListener) +{ + ScopedLockType sl (mSubLock); + return mSubPeerStatus.emplace (isrListener->getSeq (), isrListener).second; +} + +// <-- bool: true=erased, false=was not there +bool NetworkOPsImp::unsubPeerStatus (std::uint64_t uSeq) +{ + ScopedLockType sl (mSubLock); + return mSubPeerStatus.erase (uSeq); +} + InfoSub::pointer NetworkOPsImp::findRpcSub (std::string const& strUrl) { ScopedLockType sl (mSubLock); diff --git a/src/ripple/net/InfoSub.h b/src/ripple/net/InfoSub.h index 8124d271f..f9aa8e776 100644 --- a/src/ripple/net/InfoSub.h +++ b/src/ripple/net/InfoSub.h @@ -101,6 +101,10 @@ public: virtual bool subValidations (ref ispListener) = 0; virtual bool unsubValidations (std::uint64_t uListener) = 0; + virtual bool subPeerStatus (ref ispListener) = 0; + virtual bool unsubPeerStatus (std::uint64_t uListener) = 0; + virtual void pubPeerStatus (std::function const&) = 0; + // VFALCO TODO Remove // This was added for one particular partner, it // "pushes" subscription data to a particular URL. diff --git a/src/ripple/net/impl/InfoSub.cpp b/src/ripple/net/impl/InfoSub.cpp index e787c7b4c..73627a348 100644 --- a/src/ripple/net/impl/InfoSub.cpp +++ b/src/ripple/net/impl/InfoSub.cpp @@ -58,6 +58,7 @@ InfoSub::~InfoSub () m_source.unsubLedger (mSeq); m_source.unsubServer (mSeq); m_source.unsubValidations (mSeq); + m_source.unsubPeerStatus (mSeq); // Use the internal unsubscribe so that it won't call // back to us and modify its own parameter diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index a95f1d3de..9e0a6b9e1 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -1335,6 +1335,78 @@ PeerImp::onMessage (std::shared_ptr const& m) { checkSanity (m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex()); } + + app_.getOPs().pubPeerStatus ( + [=]() -> Json::Value + { + Json::Value j = Json::objectValue; + + if (m->has_newstatus ()) + { + switch (m->newstatus ()) + { + case protocol::nsCONNECTING: + j[jss::status] = "CONNECTING"; + break; + case protocol::nsCONNECTED: + j[jss::status] = "CONNECTED"; + break; + case protocol::nsMONITORING: + j[jss::status] = "MONITORING"; + break; + case protocol::nsVALIDATING: + j[jss::status] = "VALIDATING"; + break; + case protocol::nsSHUTTING: + j[jss::status] = "SHUTTING"; + break; + } + } + + if (m->has_newevent()) + { + switch (m->newevent ()) + { + case protocol::neCLOSING_LEDGER: + j[jss::action] = "CLOSING_LEDGER"; + break; + case protocol::neACCEPTED_LEDGER: + j[jss::action] = "ACCEPTED_LEDGER"; + break; + case protocol::neSWITCHED_LEDGER: + j[jss::action] = "SWITCHED_LEDGER"; + break; + case protocol::neLOST_SYNC: + j[jss::action] = "LOST_SYNC"; + break; + } + } + + if (m->has_ledgerseq ()) + { + j[jss::ledger_index] = m->ledgerseq(); + } + + if (m->has_ledgerhash ()) + { + j[jss::ledger_hash] = to_string (closedLedgerHash_); + } + + if (m->has_networktime ()) + { + j[jss::date] = Json::UInt (m->networktime()); + } + + if (m->has_firstseq () && m->has_lastseq ()) + { + j[jss::ledger_index_min] = + Json::UInt (m->firstseq ()); + j[jss::ledger_index_max] = + Json::UInt (m->lastseq ()); + } + + return j; + }); } void diff --git a/src/ripple/rpc/handlers/Subscribe.cpp b/src/ripple/rpc/handlers/Subscribe.cpp index 8c1c22873..e413fc1fd 100644 --- a/src/ripple/rpc/handlers/Subscribe.cpp +++ b/src/ripple/rpc/handlers/Subscribe.cpp @@ -143,6 +143,13 @@ Json::Value doSubscribe (RPC::Context& context) { context.netOps.subValidations (ispSub); } + else if (streamName == "peer_status") + { + if (context.role != Role::ADMIN) + jvResult[jss::error] = "noPermission"; + else + context.netOps.subPeerStatus (ispSub); + } else { jvResult[jss::error] = "unknownStream"; diff --git a/src/ripple/rpc/handlers/Unsubscribe.cpp b/src/ripple/rpc/handlers/Unsubscribe.cpp index 0318e7807..ee4d6fdcf 100644 --- a/src/ripple/rpc/handlers/Unsubscribe.cpp +++ b/src/ripple/rpc/handlers/Unsubscribe.cpp @@ -83,6 +83,9 @@ Json::Value doUnsubscribe (RPC::Context& context) else if (streamName == "validations") context.netOps.unsubValidations (ispSub->getSeq ()); + else if (streamName == "peer_status") + context.netOps.unsubPeerStatus (ispSub->getSeq ()); + else jvResult[jss::error] = "Unknown stream: " + streamName; }