Send cluster reports

This commit is contained in:
JoelKatz
2013-07-10 15:41:13 -07:00
parent 8c1c954062
commit ecc864ac8e
11 changed files with 86 additions and 21 deletions

View File

@@ -24,6 +24,7 @@ NetworkOPs::NetworkOPs (LedgerMaster* pLedgerMaster)
, mValidating (false) , mValidating (false)
, mFeatureBlocked (false) , mFeatureBlocked (false)
, m_netTimer (this) , m_netTimer (this)
, m_clusterTimer (this)
, mLedgerMaster (pLedgerMaster) , mLedgerMaster (pLedgerMaster)
, mCloseTimeOffset (0) , mCloseTimeOffset (0)
, mLastCloseProposers (0) , mLastCloseProposers (0)
@@ -39,8 +40,14 @@ NetworkOPs::NetworkOPs (LedgerMaster* pLedgerMaster)
{ {
} }
void NetworkOPs::onDeadlineTimer (DeadlineTimer&) void NetworkOPs::onDeadlineTimer (DeadlineTimer& timer)
{ {
if (timer == m_clusterTimer)
{
doClusterReport();
return;
}
ScopedLock sl (getApp().getMasterLock ()); ScopedLock sl (getApp().getMasterLock ());
getApp().getLoadManager ().resetDeadlockDetector (); getApp().getLoadManager ().resetDeadlockDetector ();
@@ -628,6 +635,7 @@ void NetworkOPs::setFeatureBlocked ()
void NetworkOPs::setStateTimer () void NetworkOPs::setStateTimer ()
{ {
m_netTimer.setRecurringExpiration (LEDGER_GRANULARITY / 1000.0); m_netTimer.setRecurringExpiration (LEDGER_GRANULARITY / 1000.0);
m_clusterTimer.setRecurringExpiration (10.0);
} }
class ValidationCount class ValidationCount
@@ -2340,4 +2348,31 @@ void NetworkOPs::missingNodeInLedger (uint32 seq)
getApp().getInboundLedgers ().findCreate (hash, seq); getApp().getInboundLedgers ().findCreate (hash, seq);
} }
void NetworkOPs::doClusterReport ()
{
ClusterNodeStatus us("", getApp().getFeeTrack().getLocalFee(), getNetworkTimeNC());
if (!getApp().getUNL().nodeUpdate(getApp().getLocalCredentials().getNodePublic(), us))
{
WriteLog (lsDEBUG, NetworkOPs) << "To soon to send cluster update";
return;
}
std::map<RippleAddress, ClusterNodeStatus> nodes = getApp().getUNL().getClusterStatus();
protocol::TMCluster cluster;
for (std::map<RippleAddress, ClusterNodeStatus>::iterator it = nodes.begin(),
end = nodes.end(); it != end; ++it)
{
protocol::TMClusterNode& node = *cluster.add_clusternodes();
node.set_publickey(it->first.humanNodePublic());
node.set_reporttime(it->second.getReportTime());
node.set_nodeload(it->second.getLoadFee());
if (!it->second.getName().empty())
node.set_nodename(it->second.getName());
}
PackedMessage::pointer message = boost::make_shared<PackedMessage>(cluster, protocol::mtCLUSTER);
getApp().getPeers().relayMessageCluster (NULL, message);
}
// vim:ts=4 // vim:ts=4

View File

@@ -308,6 +308,8 @@ public:
uint256 getConsensusLCL (); uint256 getConsensusLCL ();
void reportFeeChange (); void reportFeeChange ();
void doClusterReport ();
//Helper function to generate SQL query to get transactions //Helper function to generate SQL query to get transactions
std::string transactionsSQL (std::string selection, const RippleAddress& account, std::string transactionsSQL (std::string selection, const RippleAddress& account,
int32 minLedger, int32 maxLedger, bool descending, uint32 offset, int limit, int32 minLedger, int32 maxLedger, bool descending, uint32 offset, int limit,
@@ -384,7 +386,8 @@ private:
bool mProposing, mValidating; bool mProposing, mValidating;
bool mFeatureBlocked; bool mFeatureBlocked;
boost::posix_time::ptime mConnectTime; boost::posix_time::ptime mConnectTime;
DeadlineTimer m_netTimer; DeadlineTimer m_netTimer;
DeadlineTimer m_clusterTimer;
boost::shared_ptr<LedgerConsensus> mConsensus; boost::shared_ptr<LedgerConsensus> mConsensus;
boost::unordered_map < uint160, boost::unordered_map < uint160,
std::list<LedgerProposal::pointer> > mStoredProposals; std::list<LedgerProposal::pointer> > mStoredProposals;

View File

@@ -59,6 +59,8 @@ public:
virtual bool isConnected () const = 0; virtual bool isConnected () const = 0;
virtual bool isInCluster () const = 0;
virtual bool isInbound () const = 0; virtual bool isInbound () const = 0;
virtual bool isOutbound () const = 0; virtual bool isOutbound () const = 0;

View File

@@ -80,7 +80,7 @@ message TMHello
// The status of a node in our cluster // The status of a node in our cluster
message TMClusterNode message TMClusterNode
{ {
required bytes publicKey = 1; required string publicKey = 1;
required uint32 reportTime = 2; required uint32 reportTime = 2;
required uint32 nodeLoad = 3; required uint32 nodeLoad = 3;
optional string nodeName = 4; optional string nodeName = 4;

View File

@@ -35,15 +35,15 @@ public:
return mReportTime; return mReportTime;
} }
void update(ClusterNodeStatus const& status) bool update(ClusterNodeStatus const& status)
{ {
if (status.mReportTime > mReportTime) if (status.mReportTime <= mReportTime)
{ return false;
mLoadFee = status.mLoadFee; mLoadFee = status.mLoadFee;
mReportTime = status.mReportTime; mReportTime = status.mReportTime;
if (mNodeName.empty() || !status.mNodeName.empty()) if (mNodeName.empty() || !status.mNodeName.empty())
mNodeName = status.mNodeName; mNodeName = status.mNodeName;
} return true;
} }
private: private:

View File

@@ -21,6 +21,7 @@ public:
// Send message to network. // Send message to network.
virtual int relayMessage (Peer* fromPeer, const PackedMessage::pointer& msg) = 0; virtual int relayMessage (Peer* fromPeer, const PackedMessage::pointer& msg) = 0;
virtual int relayMessageCluster (Peer* fromPeer, const PackedMessage::pointer& msg) = 0;
virtual void relayMessageTo (const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg) = 0; virtual void relayMessageTo (const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg) = 0;
virtual void relayMessageBut (const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg) = 0; virtual void relayMessageBut (const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg) = 0;

View File

@@ -70,6 +70,10 @@ public:
{ {
return mHelloed && !mDetaching; return mHelloed && !mDetaching;
} }
bool isInCluster () const
{
return mCluster;
}
bool isInbound () const bool isInbound () const
{ {
return mInbound; return mInbound;
@@ -1425,6 +1429,7 @@ void PeerImp::recvCluster (protocol::TMCluster& packet)
applyLoadCharge(LT_UnwantedData); applyLoadCharge(LT_UnwantedData);
return; return;
} }
for (int i = 0; i < packet.clusternodes().size(); ++i) for (int i = 0; i < packet.clusternodes().size(); ++i)
{ {
protocol::TMClusterNode const& node = packet.clusternodes(i); protocol::TMClusterNode const& node = packet.clusternodes(i);

View File

@@ -64,6 +64,8 @@ public:
virtual bool isConnected () const = 0; virtual bool isConnected () const = 0;
virtual bool isInCluster () const = 0;
virtual bool isInbound () const = 0; virtual bool isInbound () const = 0;
virtual bool isOutbound () const = 0; virtual bool isOutbound () const = 0;

View File

@@ -29,6 +29,7 @@ public:
// Send message to network. // Send message to network.
int relayMessage (Peer* fromPeer, const PackedMessage::pointer& msg); int relayMessage (Peer* fromPeer, const PackedMessage::pointer& msg);
int relayMessageCluster (Peer* fromPeer, const PackedMessage::pointer& msg);
void relayMessageTo (const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg); void relayMessageTo (const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg);
void relayMessageBut (const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg); void relayMessageBut (const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg);
@@ -365,6 +366,22 @@ int Peers::relayMessage (Peer* fromPeer, const PackedMessage::pointer& msg)
return sentTo; return sentTo;
} }
int Peers::relayMessageCluster (Peer* fromPeer, const PackedMessage::pointer& msg)
{
int sentTo = 0;
std::vector<Peer::pointer> peerVector = getPeerVector ();
BOOST_FOREACH (Peer::ref peer, peerVector)
{
if ((!fromPeer || ! (peer.get () == fromPeer)) && peer->isConnected () && peer->isInCluster ())
{
++sentTo;
peer->sendPacket (msg, false);
}
}
return sentTo;
}
void Peers::relayMessageBut (const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg) void Peers::relayMessageBut (const std::set<uint64>& fromPeers, const PackedMessage::pointer& msg)
{ {
// Relay message to all but the specified peers // Relay message to all but the specified peers

View File

@@ -322,22 +322,22 @@ public:
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
void nodeUpdate (const RippleAddress& naNodePublic, ClusterNodeStatus const& cnsStatus) bool nodeUpdate (const RippleAddress& naNodePublic, ClusterNodeStatus const& cnsStatus)
{ {
boost::recursive_mutex::scoped_lock sl (mUNLLock); boost::recursive_mutex::scoped_lock sl (mUNLLock);
m_clusterNodes[naNodePublic].update(cnsStatus); return m_clusterNodes[naNodePublic].update(cnsStatus);
} }
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
std::map<RippleAddress, ClusterNodeStatus> getClusterStatus () std::map<RippleAddress, ClusterNodeStatus> getClusterStatus ()
{ {
std::map<RippleAddress, ClusterNodeStatus> ret; std::map<RippleAddress, ClusterNodeStatus> ret;
{ {
boost::recursive_mutex::scoped_lock sl (mUNLLock); boost::recursive_mutex::scoped_lock sl (mUNLLock);
ret = m_clusterNodes; ret = m_clusterNodes;
} }
return ret; return ret;
} }
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------

View File

@@ -45,7 +45,7 @@ public:
virtual bool nodeInUNL (const RippleAddress& naNodePublic) = 0; virtual bool nodeInUNL (const RippleAddress& naNodePublic) = 0;
virtual bool nodeInCluster (const RippleAddress& naNodePublic) = 0; virtual bool nodeInCluster (const RippleAddress& naNodePublic) = 0;
virtual bool nodeInCluster (const RippleAddress& naNodePublic, std::string& name) = 0; virtual bool nodeInCluster (const RippleAddress& naNodePublic, std::string& name) = 0;
virtual void nodeUpdate (const RippleAddress& naNodePublic, ClusterNodeStatus const& cnsStatus) = 0; virtual bool nodeUpdate (const RippleAddress& naNodePublic, ClusterNodeStatus const& cnsStatus) = 0;
virtual std::map<RippleAddress, ClusterNodeStatus> getClusterStatus () = 0; virtual std::map<RippleAddress, ClusterNodeStatus> getClusterStatus () = 0;
virtual uint32 getClusterFee () = 0; virtual uint32 getClusterFee () = 0;