diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index e0c7af004..72e68a74b 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -382,6 +382,10 @@ public: bool subValidations (InfoSub::ref ispListener) override; bool unsubValidations (std::uint64_t uListener) override; + bool subPeerStatus (InfoSub::ref ispListener) override; + bool unsubPeerStatus (std::uint64_t uListener) override; + void pubPeerStatus (std::function const&) override; + InfoSub::pointer findRpcSub (std::string const& strUrl) override; InfoSub::pointer addRpcSub ( std::string const& strUrl, InfoSub::ref) override; @@ -464,6 +468,7 @@ private: SubMapType mSubTransactions; // All accepted transactions. SubMapType mSubRTTransactions; // All proposed and accepted transactions. SubMapType mSubValidations; // Received validations. + SubMapType mSubPeerStatus; // peer status changes std::uint32_t mLastLoadBase; std::uint32_t mLastLoadFactor; @@ -1540,6 +1545,33 @@ void NetworkOPsImp::pubValidation (STValidation::ref val) } } +void NetworkOPsImp::pubPeerStatus ( + std::function const& func) +{ + ScopedLockType sl (mSubLock); + + if (!mSubPeerStatus.empty ()) + { + Json::Value jvObj (func()); + + jvObj [jss::type] = "peerStatusChange"; + + for (auto i = mSubPeerStatus.begin (); i != mSubPeerStatus.end (); ) + { + InfoSub::pointer p = i->second.lock (); + + if (p) + { + p->send (jvObj, true); + ++i; + } + else + { + i = mSubValidations.erase (i); + } + } + } +} void NetworkOPsImp::setMode (OperatingMode om) { @@ -2540,6 +2572,20 @@ bool NetworkOPsImp::unsubValidations (std::uint64_t uSeq) return mSubValidations.erase (uSeq); } +// <-- bool: true=added, false=already there +bool NetworkOPsImp::subPeerStatus (InfoSub::ref isrListener) +{ + ScopedLockType sl (mSubLock); + return mSubPeerStatus.emplace (isrListener->getSeq (), isrListener).second; +} + +// <-- bool: true=erased, false=was not there +bool NetworkOPsImp::unsubPeerStatus (std::uint64_t uSeq) +{ + ScopedLockType sl (mSubLock); + return mSubPeerStatus.erase (uSeq); +} + InfoSub::pointer NetworkOPsImp::findRpcSub (std::string const& strUrl) { ScopedLockType sl (mSubLock); diff --git a/src/ripple/net/InfoSub.h b/src/ripple/net/InfoSub.h index 8124d271f..f9aa8e776 100644 --- a/src/ripple/net/InfoSub.h +++ b/src/ripple/net/InfoSub.h @@ -101,6 +101,10 @@ public: virtual bool subValidations (ref ispListener) = 0; virtual bool unsubValidations (std::uint64_t uListener) = 0; + virtual bool subPeerStatus (ref ispListener) = 0; + virtual bool unsubPeerStatus (std::uint64_t uListener) = 0; + virtual void pubPeerStatus (std::function const&) = 0; + // VFALCO TODO Remove // This was added for one particular partner, it // "pushes" subscription data to a particular URL. diff --git a/src/ripple/net/impl/InfoSub.cpp b/src/ripple/net/impl/InfoSub.cpp index e787c7b4c..73627a348 100644 --- a/src/ripple/net/impl/InfoSub.cpp +++ b/src/ripple/net/impl/InfoSub.cpp @@ -58,6 +58,7 @@ InfoSub::~InfoSub () m_source.unsubLedger (mSeq); m_source.unsubServer (mSeq); m_source.unsubValidations (mSeq); + m_source.unsubPeerStatus (mSeq); // Use the internal unsubscribe so that it won't call // back to us and modify its own parameter diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index a95f1d3de..9e0a6b9e1 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -1335,6 +1335,78 @@ PeerImp::onMessage (std::shared_ptr const& m) { checkSanity (m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex()); } + + app_.getOPs().pubPeerStatus ( + [=]() -> Json::Value + { + Json::Value j = Json::objectValue; + + if (m->has_newstatus ()) + { + switch (m->newstatus ()) + { + case protocol::nsCONNECTING: + j[jss::status] = "CONNECTING"; + break; + case protocol::nsCONNECTED: + j[jss::status] = "CONNECTED"; + break; + case protocol::nsMONITORING: + j[jss::status] = "MONITORING"; + break; + case protocol::nsVALIDATING: + j[jss::status] = "VALIDATING"; + break; + case protocol::nsSHUTTING: + j[jss::status] = "SHUTTING"; + break; + } + } + + if (m->has_newevent()) + { + switch (m->newevent ()) + { + case protocol::neCLOSING_LEDGER: + j[jss::action] = "CLOSING_LEDGER"; + break; + case protocol::neACCEPTED_LEDGER: + j[jss::action] = "ACCEPTED_LEDGER"; + break; + case protocol::neSWITCHED_LEDGER: + j[jss::action] = "SWITCHED_LEDGER"; + break; + case protocol::neLOST_SYNC: + j[jss::action] = "LOST_SYNC"; + break; + } + } + + if (m->has_ledgerseq ()) + { + j[jss::ledger_index] = m->ledgerseq(); + } + + if (m->has_ledgerhash ()) + { + j[jss::ledger_hash] = to_string (closedLedgerHash_); + } + + if (m->has_networktime ()) + { + j[jss::date] = Json::UInt (m->networktime()); + } + + if (m->has_firstseq () && m->has_lastseq ()) + { + j[jss::ledger_index_min] = + Json::UInt (m->firstseq ()); + j[jss::ledger_index_max] = + Json::UInt (m->lastseq ()); + } + + return j; + }); } void diff --git a/src/ripple/rpc/handlers/Subscribe.cpp b/src/ripple/rpc/handlers/Subscribe.cpp index 8c1c22873..e413fc1fd 100644 --- a/src/ripple/rpc/handlers/Subscribe.cpp +++ b/src/ripple/rpc/handlers/Subscribe.cpp @@ -143,6 +143,13 @@ Json::Value doSubscribe (RPC::Context& context) { context.netOps.subValidations (ispSub); } + else if (streamName == "peer_status") + { + if (context.role != Role::ADMIN) + jvResult[jss::error] = "noPermission"; + else + context.netOps.subPeerStatus (ispSub); + } else { jvResult[jss::error] = "unknownStream"; diff --git a/src/ripple/rpc/handlers/Unsubscribe.cpp b/src/ripple/rpc/handlers/Unsubscribe.cpp index 0318e7807..ee4d6fdcf 100644 --- a/src/ripple/rpc/handlers/Unsubscribe.cpp +++ b/src/ripple/rpc/handlers/Unsubscribe.cpp @@ -83,6 +83,9 @@ Json::Value doUnsubscribe (RPC::Context& context) else if (streamName == "validations") context.netOps.unsubValidations (ispSub->getSeq ()); + else if (streamName == "peer_status") + context.netOps.unsubPeerStatus (ispSub->getSeq ()); + else jvResult[jss::error] = "Unknown stream: " + streamName; }