diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index b3e28437e..c3c46ef6e 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -24,6 +24,7 @@ NetworkOPs::NetworkOPs (LedgerMaster* pLedgerMaster) , mValidating (false) , mFeatureBlocked (false) , m_netTimer (this) + , m_clusterTimer (this) , mLedgerMaster (pLedgerMaster) , mCloseTimeOffset (0) , mLastCloseProposers (0) @@ -39,8 +40,14 @@ NetworkOPs::NetworkOPs (LedgerMaster* pLedgerMaster) { } -void NetworkOPs::onDeadlineTimer (DeadlineTimer&) +void NetworkOPs::onDeadlineTimer (DeadlineTimer& timer) { + if (timer == m_clusterTimer) + { + doClusterReport(); + return; + } + ScopedLock sl (getApp().getMasterLock ()); getApp().getLoadManager ().resetDeadlockDetector (); @@ -628,6 +635,7 @@ void NetworkOPs::setFeatureBlocked () void NetworkOPs::setStateTimer () { m_netTimer.setRecurringExpiration (LEDGER_GRANULARITY / 1000.0); + m_clusterTimer.setRecurringExpiration (10.0); } class ValidationCount @@ -2340,4 +2348,31 @@ void NetworkOPs::missingNodeInLedger (uint32 seq) getApp().getInboundLedgers ().findCreate (hash, seq); } +void NetworkOPs::doClusterReport () +{ + ClusterNodeStatus us("", getApp().getFeeTrack().getLocalFee(), getNetworkTimeNC()); + if (!getApp().getUNL().nodeUpdate(getApp().getLocalCredentials().getNodePublic(), us)) + { + WriteLog (lsDEBUG, NetworkOPs) << "To soon to send cluster update"; + return; + } + + std::map nodes = getApp().getUNL().getClusterStatus(); + + protocol::TMCluster cluster; + for (std::map::iterator it = nodes.begin(), + end = nodes.end(); it != end; ++it) + { + protocol::TMClusterNode& node = *cluster.add_clusternodes(); + node.set_publickey(it->first.humanNodePublic()); + node.set_reporttime(it->second.getReportTime()); + node.set_nodeload(it->second.getLoadFee()); + if (!it->second.getName().empty()) + node.set_nodename(it->second.getName()); + } + + PackedMessage::pointer message = boost::make_shared(cluster, protocol::mtCLUSTER); + getApp().getPeers().relayMessageCluster (NULL, message); +} + // vim:ts=4 diff --git a/src/cpp/ripple/NetworkOPs.h b/src/cpp/ripple/NetworkOPs.h index dc382f5b8..f923d8aa6 100644 --- a/src/cpp/ripple/NetworkOPs.h +++ b/src/cpp/ripple/NetworkOPs.h @@ -308,6 +308,8 @@ public: uint256 getConsensusLCL (); void reportFeeChange (); + void doClusterReport (); + //Helper function to generate SQL query to get transactions std::string transactionsSQL (std::string selection, const RippleAddress& account, int32 minLedger, int32 maxLedger, bool descending, uint32 offset, int limit, @@ -384,7 +386,8 @@ private: bool mProposing, mValidating; bool mFeatureBlocked; boost::posix_time::ptime mConnectTime; - DeadlineTimer m_netTimer; + DeadlineTimer m_netTimer; + DeadlineTimer m_clusterTimer; boost::shared_ptr mConsensus; boost::unordered_map < uint160, std::list > mStoredProposals; diff --git a/src/cpp/ripple/Peer.h b/src/cpp/ripple/Peer.h index e7987a9b5..5aef8aafa 100644 --- a/src/cpp/ripple/Peer.h +++ b/src/cpp/ripple/Peer.h @@ -59,6 +59,8 @@ public: virtual bool isConnected () const = 0; + virtual bool isInCluster () const = 0; + virtual bool isInbound () const = 0; virtual bool isOutbound () const = 0; diff --git a/src/cpp/ripple/ripple.proto b/src/cpp/ripple/ripple.proto index cc098a8bd..de65a5ba5 100644 --- a/src/cpp/ripple/ripple.proto +++ b/src/cpp/ripple/ripple.proto @@ -80,7 +80,7 @@ message TMHello // The status of a node in our cluster message TMClusterNode { - required bytes publicKey = 1; + required string publicKey = 1; required uint32 reportTime = 2; required uint32 nodeLoad = 3; optional string nodeName = 4; diff --git a/src/cpp/ripple/ripple_ClusterNodeStatus.h b/src/cpp/ripple/ripple_ClusterNodeStatus.h index aa2afdd1e..6520e59c5 100644 --- a/src/cpp/ripple/ripple_ClusterNodeStatus.h +++ b/src/cpp/ripple/ripple_ClusterNodeStatus.h @@ -35,15 +35,15 @@ public: return mReportTime; } - void update(ClusterNodeStatus const& status) + bool update(ClusterNodeStatus const& status) { - if (status.mReportTime > mReportTime) - { - mLoadFee = status.mLoadFee; - mReportTime = status.mReportTime; - if (mNodeName.empty() || !status.mNodeName.empty()) - mNodeName = status.mNodeName; - } + if (status.mReportTime <= mReportTime) + return false; + mLoadFee = status.mLoadFee; + mReportTime = status.mReportTime; + if (mNodeName.empty() || !status.mNodeName.empty()) + mNodeName = status.mNodeName; + return true; } private: diff --git a/src/cpp/ripple/ripple_IPeers.h b/src/cpp/ripple/ripple_IPeers.h index 91e179581..bfa44ea4b 100644 --- a/src/cpp/ripple/ripple_IPeers.h +++ b/src/cpp/ripple/ripple_IPeers.h @@ -21,6 +21,7 @@ public: // Send message to network. virtual int relayMessage (Peer* fromPeer, const PackedMessage::pointer& msg) = 0; + virtual int relayMessageCluster (Peer* fromPeer, const PackedMessage::pointer& msg) = 0; virtual void relayMessageTo (const std::set& fromPeers, const PackedMessage::pointer& msg) = 0; virtual void relayMessageBut (const std::set& fromPeers, const PackedMessage::pointer& msg) = 0; diff --git a/src/cpp/ripple/ripple_Peer.cpp b/src/cpp/ripple/ripple_Peer.cpp index 8f5c1634c..ab50643e5 100644 --- a/src/cpp/ripple/ripple_Peer.cpp +++ b/src/cpp/ripple/ripple_Peer.cpp @@ -70,6 +70,10 @@ public: { return mHelloed && !mDetaching; } + bool isInCluster () const + { + return mCluster; + } bool isInbound () const { return mInbound; @@ -1425,6 +1429,7 @@ void PeerImp::recvCluster (protocol::TMCluster& packet) applyLoadCharge(LT_UnwantedData); return; } + for (int i = 0; i < packet.clusternodes().size(); ++i) { protocol::TMClusterNode const& node = packet.clusternodes(i); diff --git a/src/cpp/ripple/ripple_Peer.h b/src/cpp/ripple/ripple_Peer.h index 49f05b6b9..e24003d6a 100644 --- a/src/cpp/ripple/ripple_Peer.h +++ b/src/cpp/ripple/ripple_Peer.h @@ -64,6 +64,8 @@ public: virtual bool isConnected () const = 0; + virtual bool isInCluster () const = 0; + virtual bool isInbound () const = 0; virtual bool isOutbound () const = 0; diff --git a/src/cpp/ripple/ripple_Peers.cpp b/src/cpp/ripple/ripple_Peers.cpp index ade55fbc8..2a6ae72a1 100644 --- a/src/cpp/ripple/ripple_Peers.cpp +++ b/src/cpp/ripple/ripple_Peers.cpp @@ -29,6 +29,7 @@ public: // Send message to network. int relayMessage (Peer* fromPeer, const PackedMessage::pointer& msg); + int relayMessageCluster (Peer* fromPeer, const PackedMessage::pointer& msg); void relayMessageTo (const std::set& fromPeers, const PackedMessage::pointer& msg); void relayMessageBut (const std::set& fromPeers, const PackedMessage::pointer& msg); @@ -365,6 +366,22 @@ int Peers::relayMessage (Peer* fromPeer, const PackedMessage::pointer& msg) return sentTo; } +int Peers::relayMessageCluster (Peer* fromPeer, const PackedMessage::pointer& msg) +{ + int sentTo = 0; + std::vector peerVector = getPeerVector (); + BOOST_FOREACH (Peer::ref peer, peerVector) + { + if ((!fromPeer || ! (peer.get () == fromPeer)) && peer->isConnected () && peer->isInCluster ()) + { + ++sentTo; + peer->sendPacket (msg, false); + } + } + + return sentTo; +} + void Peers::relayMessageBut (const std::set& fromPeers, const PackedMessage::pointer& msg) { // Relay message to all but the specified peers diff --git a/src/cpp/ripple/ripple_UniqueNodeList.cpp b/src/cpp/ripple/ripple_UniqueNodeList.cpp index a1ffbc3b7..1bceada5a 100644 --- a/src/cpp/ripple/ripple_UniqueNodeList.cpp +++ b/src/cpp/ripple/ripple_UniqueNodeList.cpp @@ -322,22 +322,22 @@ public: //-------------------------------------------------------------------------- - void nodeUpdate (const RippleAddress& naNodePublic, ClusterNodeStatus const& cnsStatus) + bool nodeUpdate (const RippleAddress& naNodePublic, ClusterNodeStatus const& cnsStatus) { - boost::recursive_mutex::scoped_lock sl (mUNLLock); - m_clusterNodes[naNodePublic].update(cnsStatus); + boost::recursive_mutex::scoped_lock sl (mUNLLock); + return m_clusterNodes[naNodePublic].update(cnsStatus); } //-------------------------------------------------------------------------- std::map getClusterStatus () { - std::map ret; - { - boost::recursive_mutex::scoped_lock sl (mUNLLock); - ret = m_clusterNodes; - } - return ret; + std::map ret; + { + boost::recursive_mutex::scoped_lock sl (mUNLLock); + ret = m_clusterNodes; + } + return ret; } //-------------------------------------------------------------------------- diff --git a/src/cpp/ripple/ripple_UniqueNodeList.h b/src/cpp/ripple/ripple_UniqueNodeList.h index 29f92b83a..6e5a6f039 100644 --- a/src/cpp/ripple/ripple_UniqueNodeList.h +++ b/src/cpp/ripple/ripple_UniqueNodeList.h @@ -45,7 +45,7 @@ public: virtual bool nodeInUNL (const RippleAddress& naNodePublic) = 0; virtual bool nodeInCluster (const RippleAddress& naNodePublic) = 0; virtual bool nodeInCluster (const RippleAddress& naNodePublic, std::string& name) = 0; - virtual void nodeUpdate (const RippleAddress& naNodePublic, ClusterNodeStatus const& cnsStatus) = 0; + virtual bool nodeUpdate (const RippleAddress& naNodePublic, ClusterNodeStatus const& cnsStatus) = 0; virtual std::map getClusterStatus () = 0; virtual uint32 getClusterFee () = 0;