Report consensus phase changes in the server subscription stream

This commit is contained in:
Mo Morsi
2019-08-17 10:43:55 -04:00
committed by Manoj doshi
parent 726dd69ab9
commit 15c5f9c111
7 changed files with 97 additions and 2 deletions

View File

@@ -385,6 +385,7 @@ public:
boost::optional<std::chrono::milliseconds> consensusDelay) override;
uint256 getConsensusLCL () override;
void reportFeeChange () override;
void reportConsensusStateChange(ConsensusPhase phase);
void updateLocalTx (ReadView const& view) override
{
@@ -486,6 +487,9 @@ public:
bool unsubPeerStatus (std::uint64_t uListener) override;
void pubPeerStatus (std::function<Json::Value(void)> const&) override;
bool subConsensus (InfoSub::ref ispListener) override;
bool unsubConsensus (std::uint64_t uListener) override;
InfoSub::pointer findRpcSub (std::string const& strUrl) override;
InfoSub::pointer addRpcSub (
std::string const& strUrl, InfoSub::ref) override;
@@ -543,6 +547,7 @@ private:
bool isAccepted);
void pubServer ();
void pubConsensus (ConsensusPhase phase);
std::string getHostId (bool forAdmin);
@@ -570,6 +575,8 @@ private:
RCLConsensus mConsensus;
ConsensusPhase mLastConsensusPhase;
LedgerMaster& m_ledgerMaster;
std::shared_ptr<InboundLedger> mAcquiringLedger;
@@ -587,9 +594,10 @@ private:
sRTTransactions, // All proposed and accepted transactions.
sValidations, // Received validations.
sPeerStatus, // Peer status changes.
sConsensusPhase, // Consensus phase
sLastEntry = sPeerStatus // as this name implies, any new entry must
// be ADDED ABOVE this one
sLastEntry = sConsensusPhase // as this name implies, any new entry must
// be ADDED ABOVE this one
};
std::array<SubMapType, SubTypes::sLastEntry+1> mStreamMaps;
@@ -769,6 +777,13 @@ void NetworkOPsImp::processHeartbeatTimer ()
mConsensus.timerEntry (app_.timeKeeper().closeTime());
const ConsensusPhase currPhase = mConsensus.phase();
if (mLastConsensusPhase != currPhase)
{
reportConsensusStateChange(currPhase);
mLastConsensusPhase = currPhase;
}
setHeartbeatTimer ();
}
@@ -1464,6 +1479,13 @@ bool NetworkOPsImp::beginConsensus (uint256 const& networkClosed)
prevLedger,
changes.removed);
const ConsensusPhase currPhase = mConsensus.phase();
if (mLastConsensusPhase != currPhase)
{
reportConsensusStateChange(currPhase);
mLastConsensusPhase = currPhase;
}
JLOG(m_journal.debug()) << "Initiating consensus engine";
return true;
}
@@ -1711,6 +1733,33 @@ void NetworkOPsImp::pubServer ()
}
}
void NetworkOPsImp::pubConsensus (ConsensusPhase phase)
{
std::lock_guard sl (mSubLock);
auto& streamMap = mStreamMaps[sConsensusPhase];
if (!streamMap.empty ())
{
Json::Value jvObj (Json::objectValue);
jvObj [jss::type] = "consensusPhase";
jvObj [jss::consensus] = to_string(phase);
for (auto i = streamMap.begin ();
i != streamMap.end (); )
{
if (auto p = i->second.lock())
{
p->send (jvObj, true);
++i;
}
else
{
i = streamMap.erase (i);
}
}
}
}
void NetworkOPsImp::pubValidation (STValidation::ref val)
{
@@ -2517,6 +2566,13 @@ void NetworkOPsImp::reportFeeChange ()
}
}
void NetworkOPsImp::reportConsensusStateChange (ConsensusPhase phase)
{
m_job_queue.addJob (
jtCLIENT, "reportConsensusStateChange->pubConsensus",
[this, phase] (Job&) { pubConsensus(phase); });
}
// This routine should only be used to publish accepted or validated
// transactions.
Json::Value NetworkOPsImp::transJson(
@@ -2973,6 +3029,21 @@ bool NetworkOPsImp::unsubPeerStatus (std::uint64_t uSeq)
return mStreamMaps[sPeerStatus].erase (uSeq);
}
// <-- bool: true=added, false=already there
bool NetworkOPsImp::subConsensus (InfoSub::ref isrListener)
{
std::lock_guard sl (mSubLock);
return mStreamMaps[sConsensusPhase].emplace (
isrListener->getSeq (), isrListener).second;
}
// <-- bool: true=erased, false=was not there
bool NetworkOPsImp::unsubConsensus (std::uint64_t uSeq)
{
std::lock_guard sl (mSubLock);
return mStreamMaps[sConsensusPhase].erase (uSeq);
}
InfoSub::pointer NetworkOPsImp::findRpcSub (std::string const& strUrl)
{
std::lock_guard sl (mSubLock);