From 15c5f9c1111eeea0743dbd9d9b0028756ff72ade Mon Sep 17 00:00:00 2001 From: Mo Morsi Date: Sat, 17 Aug 2019 10:43:55 -0400 Subject: [PATCH] Report consensus phase changes in the server subscription stream --- src/ripple/app/consensus/RCLConsensus.h | 6 ++ src/ripple/app/misc/NetworkOPs.cpp | 75 ++++++++++++++++++++++++- src/ripple/consensus/Consensus.h | 6 ++ src/ripple/net/InfoSub.h | 3 + src/ripple/net/impl/InfoSub.cpp | 1 + src/ripple/rpc/handlers/Subscribe.cpp | 4 ++ src/ripple/rpc/handlers/Unsubscribe.cpp | 4 ++ 7 files changed, 97 insertions(+), 2 deletions(-) diff --git a/src/ripple/app/consensus/RCLConsensus.h b/src/ripple/app/consensus/RCLConsensus.h index d238e39bf..8298a9602 100644 --- a/src/ripple/app/consensus/RCLConsensus.h +++ b/src/ripple/app/consensus/RCLConsensus.h @@ -455,6 +455,12 @@ public: return adaptor_.mode(); } + ConsensusPhase + phase() const + { + return consensus_.phase(); + } + //! @see Consensus::getJson Json::Value getJson(bool full) const; diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index b4e493270..537f8d996 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -385,6 +385,7 @@ public: boost::optional consensusDelay) override; uint256 getConsensusLCL () override; void reportFeeChange () override; + void reportConsensusStateChange(ConsensusPhase phase); void updateLocalTx (ReadView const& view) override { @@ -486,6 +487,9 @@ public: bool unsubPeerStatus (std::uint64_t uListener) override; void pubPeerStatus (std::function const&) override; + bool subConsensus (InfoSub::ref ispListener) override; + bool unsubConsensus (std::uint64_t uListener) override; + InfoSub::pointer findRpcSub (std::string const& strUrl) override; InfoSub::pointer addRpcSub ( std::string const& strUrl, InfoSub::ref) override; @@ -543,6 +547,7 @@ private: bool isAccepted); void pubServer (); + void pubConsensus (ConsensusPhase phase); std::string getHostId (bool forAdmin); @@ -570,6 +575,8 @@ private: RCLConsensus mConsensus; + ConsensusPhase mLastConsensusPhase; + LedgerMaster& m_ledgerMaster; std::shared_ptr mAcquiringLedger; @@ -587,9 +594,10 @@ private: sRTTransactions, // All proposed and accepted transactions. sValidations, // Received validations. sPeerStatus, // Peer status changes. + sConsensusPhase, // Consensus phase - sLastEntry = sPeerStatus // as this name implies, any new entry must - // be ADDED ABOVE this one + sLastEntry = sConsensusPhase // as this name implies, any new entry must + // be ADDED ABOVE this one }; std::array mStreamMaps; @@ -769,6 +777,13 @@ void NetworkOPsImp::processHeartbeatTimer () mConsensus.timerEntry (app_.timeKeeper().closeTime()); + const ConsensusPhase currPhase = mConsensus.phase(); + if (mLastConsensusPhase != currPhase) + { + reportConsensusStateChange(currPhase); + mLastConsensusPhase = currPhase; + } + setHeartbeatTimer (); } @@ -1464,6 +1479,13 @@ bool NetworkOPsImp::beginConsensus (uint256 const& networkClosed) prevLedger, changes.removed); + const ConsensusPhase currPhase = mConsensus.phase(); + if (mLastConsensusPhase != currPhase) + { + reportConsensusStateChange(currPhase); + mLastConsensusPhase = currPhase; + } + JLOG(m_journal.debug()) << "Initiating consensus engine"; return true; } @@ -1711,6 +1733,33 @@ void NetworkOPsImp::pubServer () } } +void NetworkOPsImp::pubConsensus (ConsensusPhase phase) +{ + std::lock_guard sl (mSubLock); + + auto& streamMap = mStreamMaps[sConsensusPhase]; + if (!streamMap.empty ()) + { + Json::Value jvObj (Json::objectValue); + jvObj [jss::type] = "consensusPhase"; + jvObj [jss::consensus] = to_string(phase); + + for (auto i = streamMap.begin (); + i != streamMap.end (); ) + { + if (auto p = i->second.lock()) + { + p->send (jvObj, true); + ++i; + } + else + { + i = streamMap.erase (i); + } + } + } +} + void NetworkOPsImp::pubValidation (STValidation::ref val) { @@ -2517,6 +2566,13 @@ void NetworkOPsImp::reportFeeChange () } } +void NetworkOPsImp::reportConsensusStateChange (ConsensusPhase phase) +{ + m_job_queue.addJob ( + jtCLIENT, "reportConsensusStateChange->pubConsensus", + [this, phase] (Job&) { pubConsensus(phase); }); +} + // This routine should only be used to publish accepted or validated // transactions. Json::Value NetworkOPsImp::transJson( @@ -2973,6 +3029,21 @@ bool NetworkOPsImp::unsubPeerStatus (std::uint64_t uSeq) return mStreamMaps[sPeerStatus].erase (uSeq); } +// <-- bool: true=added, false=already there +bool NetworkOPsImp::subConsensus (InfoSub::ref isrListener) +{ + std::lock_guard sl (mSubLock); + return mStreamMaps[sConsensusPhase].emplace ( + isrListener->getSeq (), isrListener).second; +} + +// <-- bool: true=erased, false=was not there +bool NetworkOPsImp::unsubConsensus (std::uint64_t uSeq) +{ + std::lock_guard sl (mSubLock); + return mStreamMaps[sConsensusPhase].erase (uSeq); +} + InfoSub::pointer NetworkOPsImp::findRpcSub (std::string const& strUrl) { std::lock_guard sl (mSubLock); diff --git a/src/ripple/consensus/Consensus.h b/src/ripple/consensus/Consensus.h index 11a3b0daa..3aa5e0db4 100644 --- a/src/ripple/consensus/Consensus.h +++ b/src/ripple/consensus/Consensus.h @@ -413,6 +413,12 @@ public: return prevLedgerID_; } + ConsensusPhase + phase() const + { + return phase_; + } + /** Get the Json state of the consensus process. Called by the consensus_info RPC. diff --git a/src/ripple/net/InfoSub.h b/src/ripple/net/InfoSub.h index 6d97ffc39..a8c221b12 100644 --- a/src/ripple/net/InfoSub.h +++ b/src/ripple/net/InfoSub.h @@ -109,6 +109,9 @@ public: virtual bool unsubPeerStatus (std::uint64_t uListener) = 0; virtual void pubPeerStatus (std::function const&) = 0; + virtual bool subConsensus (ref ispListener) = 0; + virtual bool unsubConsensus (std::uint64_t uListener) = 0; + // VFALCO TODO Remove // This was added for one particular partner, it // "pushes" subscription data to a particular URL. diff --git a/src/ripple/net/impl/InfoSub.cpp b/src/ripple/net/impl/InfoSub.cpp index bfe4fd642..0513ec192 100644 --- a/src/ripple/net/impl/InfoSub.cpp +++ b/src/ripple/net/impl/InfoSub.cpp @@ -64,6 +64,7 @@ InfoSub::~InfoSub () m_source.unsubServer (mSeq); m_source.unsubValidations (mSeq); m_source.unsubPeerStatus (mSeq); + m_source.unsubConsensus (mSeq); // Use the internal unsubscribe so that it won't call // back to us and modify its own parameter diff --git a/src/ripple/rpc/handlers/Subscribe.cpp b/src/ripple/rpc/handlers/Subscribe.cpp index 4733389f3..3279f2123 100644 --- a/src/ripple/rpc/handlers/Subscribe.cpp +++ b/src/ripple/rpc/handlers/Subscribe.cpp @@ -152,6 +152,10 @@ Json::Value doSubscribe (RPC::Context& context) return rpcError(rpcNO_PERMISSION); context.netOps.subPeerStatus (ispSub); } + else if (streamName == "consensus") + { + context.netOps.subConsensus (ispSub); + } else { return rpcError(rpcSTREAM_MALFORMED); diff --git a/src/ripple/rpc/handlers/Unsubscribe.cpp b/src/ripple/rpc/handlers/Unsubscribe.cpp index 4bb9f9ac1..799189ae5 100644 --- a/src/ripple/rpc/handlers/Unsubscribe.cpp +++ b/src/ripple/rpc/handlers/Unsubscribe.cpp @@ -97,6 +97,10 @@ Json::Value doUnsubscribe (RPC::Context& context) { context.netOps.unsubPeerStatus (ispSub->getSeq ()); } + else if (streamName == "consensus") + { + context.netOps.unsubConsensus (ispSub->getSeq()); + } else { return rpcError(rpcSTREAM_MALFORMED);