Allow subscription to peer status changes (RIPD-579)

Subscribe to "peer_status" stream (admin only) permits
reception of "peerStatusChange" notifications.

These can include the event the peer is reporting, the peer's
new status, the peer's currently accepted ledger hash and sequence,
the peer's network time, and the range of ledgers the peer has
available for remote querying.
This commit is contained in:
JoelKatz
2015-10-19 16:09:01 -07:00
committed by Nik Bougalis
parent 75bed5efcf
commit 6dbbb7406c
6 changed files with 133 additions and 0 deletions

View File

@@ -382,6 +382,10 @@ public:
bool subValidations (InfoSub::ref ispListener) override;
bool unsubValidations (std::uint64_t uListener) override;
bool subPeerStatus (InfoSub::ref ispListener) override;
bool unsubPeerStatus (std::uint64_t uListener) override;
void pubPeerStatus (std::function<Json::Value(void)> const&) override;
InfoSub::pointer findRpcSub (std::string const& strUrl) override;
InfoSub::pointer addRpcSub (
std::string const& strUrl, InfoSub::ref) override;
@@ -464,6 +468,7 @@ private:
SubMapType mSubTransactions; // All accepted transactions.
SubMapType mSubRTTransactions; // All proposed and accepted transactions.
SubMapType mSubValidations; // Received validations.
SubMapType mSubPeerStatus; // peer status changes
std::uint32_t mLastLoadBase;
std::uint32_t mLastLoadFactor;
@@ -1540,6 +1545,33 @@ void NetworkOPsImp::pubValidation (STValidation::ref val)
}
}
void NetworkOPsImp::pubPeerStatus (
std::function<Json::Value(void)> const& func)
{
ScopedLockType sl (mSubLock);
if (!mSubPeerStatus.empty ())
{
Json::Value jvObj (func());
jvObj [jss::type] = "peerStatusChange";
for (auto i = mSubPeerStatus.begin (); i != mSubPeerStatus.end (); )
{
InfoSub::pointer p = i->second.lock ();
if (p)
{
p->send (jvObj, true);
++i;
}
else
{
i = mSubValidations.erase (i);
}
}
}
}
void NetworkOPsImp::setMode (OperatingMode om)
{
@@ -2540,6 +2572,20 @@ bool NetworkOPsImp::unsubValidations (std::uint64_t uSeq)
return mSubValidations.erase (uSeq);
}
// <-- bool: true=added, false=already there
bool NetworkOPsImp::subPeerStatus (InfoSub::ref isrListener)
{
ScopedLockType sl (mSubLock);
return mSubPeerStatus.emplace (isrListener->getSeq (), isrListener).second;
}
// <-- bool: true=erased, false=was not there
bool NetworkOPsImp::unsubPeerStatus (std::uint64_t uSeq)
{
ScopedLockType sl (mSubLock);
return mSubPeerStatus.erase (uSeq);
}
InfoSub::pointer NetworkOPsImp::findRpcSub (std::string const& strUrl)
{
ScopedLockType sl (mSubLock);

View File

@@ -101,6 +101,10 @@ public:
virtual bool subValidations (ref ispListener) = 0;
virtual bool unsubValidations (std::uint64_t uListener) = 0;
virtual bool subPeerStatus (ref ispListener) = 0;
virtual bool unsubPeerStatus (std::uint64_t uListener) = 0;
virtual void pubPeerStatus (std::function<Json::Value(void)> const&) = 0;
// VFALCO TODO Remove
// This was added for one particular partner, it
// "pushes" subscription data to a particular URL.

View File

@@ -58,6 +58,7 @@ InfoSub::~InfoSub ()
m_source.unsubLedger (mSeq);
m_source.unsubServer (mSeq);
m_source.unsubValidations (mSeq);
m_source.unsubPeerStatus (mSeq);
// Use the internal unsubscribe so that it won't call
// back to us and modify its own parameter

View File

@@ -1335,6 +1335,78 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMStatusChange> const& m)
{
checkSanity (m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
}
app_.getOPs().pubPeerStatus (
[=]() -> Json::Value
{
Json::Value j = Json::objectValue;
if (m->has_newstatus ())
{
switch (m->newstatus ())
{
case protocol::nsCONNECTING:
j[jss::status] = "CONNECTING";
break;
case protocol::nsCONNECTED:
j[jss::status] = "CONNECTED";
break;
case protocol::nsMONITORING:
j[jss::status] = "MONITORING";
break;
case protocol::nsVALIDATING:
j[jss::status] = "VALIDATING";
break;
case protocol::nsSHUTTING:
j[jss::status] = "SHUTTING";
break;
}
}
if (m->has_newevent())
{
switch (m->newevent ())
{
case protocol::neCLOSING_LEDGER:
j[jss::action] = "CLOSING_LEDGER";
break;
case protocol::neACCEPTED_LEDGER:
j[jss::action] = "ACCEPTED_LEDGER";
break;
case protocol::neSWITCHED_LEDGER:
j[jss::action] = "SWITCHED_LEDGER";
break;
case protocol::neLOST_SYNC:
j[jss::action] = "LOST_SYNC";
break;
}
}
if (m->has_ledgerseq ())
{
j[jss::ledger_index] = m->ledgerseq();
}
if (m->has_ledgerhash ())
{
j[jss::ledger_hash] = to_string (closedLedgerHash_);
}
if (m->has_networktime ())
{
j[jss::date] = Json::UInt (m->networktime());
}
if (m->has_firstseq () && m->has_lastseq ())
{
j[jss::ledger_index_min] =
Json::UInt (m->firstseq ());
j[jss::ledger_index_max] =
Json::UInt (m->lastseq ());
}
return j;
});
}
void

View File

@@ -143,6 +143,13 @@ Json::Value doSubscribe (RPC::Context& context)
{
context.netOps.subValidations (ispSub);
}
else if (streamName == "peer_status")
{
if (context.role != Role::ADMIN)
jvResult[jss::error] = "noPermission";
else
context.netOps.subPeerStatus (ispSub);
}
else
{
jvResult[jss::error] = "unknownStream";

View File

@@ -83,6 +83,9 @@ Json::Value doUnsubscribe (RPC::Context& context)
else if (streamName == "validations")
context.netOps.unsubValidations (ispSub->getSeq ());
else if (streamName == "peer_status")
context.netOps.unsubPeerStatus (ispSub->getSeq ());
else
jvResult[jss::error] = "Unknown stream: " + streamName;
}