Separate cluster tracking from UNL:

* Simplify code
* Leverage C++14 transparent comparators
This commit is contained in:
Nik Bougalis
2015-10-07 01:40:31 -07:00
parent 41ff751cd2
commit 818130a8c0
17 changed files with 603 additions and 208 deletions

View File

@@ -2582,8 +2582,14 @@
</ClCompile>
<ClInclude Include="..\..\src\ripple\nodestore\Types.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\overlay\ClusterNodeStatus.h">
<ClInclude Include="..\..\src\ripple\overlay\Cluster.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\overlay\ClusterNode.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\overlay\impl\Cluster.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\overlay\impl\ConnectAttempt.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
@@ -2648,6 +2654,10 @@
</ClInclude>
<None Include="..\..\src\ripple\overlay\README.md">
</None>
<ClCompile Include="..\..\src\ripple\overlay\tests\cluster_test.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\overlay\tests\manifest_test.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>

View File

@@ -3255,9 +3255,15 @@
<ClInclude Include="..\..\src\ripple\nodestore\Types.h">
<Filter>ripple\nodestore</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\overlay\ClusterNodeStatus.h">
<ClInclude Include="..\..\src\ripple\overlay\Cluster.h">
<Filter>ripple\overlay</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\overlay\ClusterNode.h">
<Filter>ripple\overlay</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\overlay\impl\Cluster.cpp">
<Filter>ripple\overlay\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\overlay\impl\ConnectAttempt.cpp">
<Filter>ripple\overlay\impl</Filter>
</ClCompile>
@@ -3330,6 +3336,9 @@
<None Include="..\..\src\ripple\overlay\README.md">
<Filter>ripple\overlay</Filter>
</None>
<ClCompile Include="..\..\src\ripple\overlay\tests\cluster_test.cpp">
<Filter>ripple\overlay\tests</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\overlay\tests\manifest_test.cpp">
<Filter>ripple\overlay\tests</Filter>
</ClCompile>

View File

@@ -59,6 +59,7 @@
#include <ripple/nodestore/Database.h>
#include <ripple/nodestore/DummyScheduler.h>
#include <ripple/nodestore/Manager.h>
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/make_Overlay.h>
#include <ripple/protocol/Indexes.h>
#include <ripple/protocol/SecretKey.h>
@@ -320,6 +321,7 @@ public:
std::unique_ptr <InboundTransactions> m_inboundTransactions;
TaggedCache <uint256, AcceptedLedger> m_acceptedLedgerCache;
std::unique_ptr <NetworkOPs> m_networkOPs;
std::unique_ptr <Cluster> cluster_;
std::unique_ptr <UniqueNodeList> m_deprecatedUNL;
std::unique_ptr <ServerHandler> serverHandler_;
std::unique_ptr <AmendmentTable> m_amendmentTable;
@@ -660,6 +662,11 @@ public:
return *m_deprecatedUNL;
}
Cluster& cluster () override
{
return *cluster_;
}
SHAMapStore& getSHAMapStore () override
{
return *m_shaMapStore;
@@ -982,6 +989,8 @@ void ApplicationImp::setup()
m_orderBookDB.setup (getLedgerMaster ().getCurrentLedger ());
cluster_ = make_Cluster (config (), logs_->journal("Overlay"));
// Begin validation and ip maintenance.
//
// - LocalCredentials maintains local information: including identity

View File

@@ -64,6 +64,7 @@ class TimeKeeper;
class TransactionMaster;
class TxQ;
class Validations;
class Cluster;
class DatabaseCon;
class SHAMapStore;
@@ -117,6 +118,7 @@ public:
virtual Overlay& overlay () = 0;
virtual TxQ& getTxQ() = 0;
virtual UniqueNodeList& getUNL () = 0;
virtual Cluster& cluster () = 0;
virtual Validations& getValidations () = 0;
virtual NodeStore::Database& getNodeStore () = 0;
virtual InboundLedgers& getInboundLedgers () = 0;

View File

@@ -55,7 +55,8 @@
#include <ripple/crypto/RandomNumbers.h>
#include <ripple/crypto/RFC1751.h>
#include <ripple/json/to_string.h>
#include <ripple/overlay/ClusterNodeStatus.h>
#include <ripple/overlay/ClusterNode.h>
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/overlay/predicates.h>
#include <ripple/protocol/BuildInfo.h>
@@ -669,28 +670,31 @@ void NetworkOPsImp::processHeartbeatTimer ()
void NetworkOPsImp::processClusterTimer ()
{
bool synced = (m_ledgerMaster.getValidatedLedgerAge() <= 240);
ClusterNodeStatus us("", synced ? app_.getFeeTrack().getLocalFee() : 0,
app_.timeKeeper().now().time_since_epoch().count());
auto& unl = app_.getUNL();
if (!unl.nodeUpdate(app_.getLocalCredentials().getNodePublic(), us))
bool const update = app_.cluster().update(
app_.getLocalCredentials().getNodePublic(),
"",
(m_ledgerMaster.getValidatedLedgerAge() <= 240)
? app_.getFeeTrack().getLocalFee()
: 0,
app_.timeKeeper().now().time_since_epoch().count());
if (!update)
{
m_journal.debug << "To soon to send cluster update";
m_journal.debug << "Too soon to send cluster update";
return;
}
auto nodes = unl.getClusterStatus();
protocol::TMCluster cluster;
for (auto& it: nodes)
{
protocol::TMClusterNode& node = *cluster.add_clusternodes();
node.set_publickey(it.first.humanNodePublic());
node.set_reporttime(it.second.getReportTime());
node.set_nodeload(it.second.getLoadFee());
if (!it.second.getName().empty())
node.set_nodename(it.second.getName());
}
app_.cluster().for_each(
[&cluster](ClusterNode const& node)
{
protocol::TMClusterNode& n = *cluster.add_clusternodes();
n.set_publickey(node.identity().humanNodePublic());
n.set_reporttime(node.getReportTime());
n.set_nodeload(node.getLoadFee());
if (!node.name().empty())
n.set_nodename(node.name());
});
Resource::Gossip gossip = app_.getResourceManager().exportConsumers();
for (auto& item: gossip.items)

View File

@@ -22,7 +22,6 @@
#include <ripple/app/main/LocalCredentials.h>
#include <ripple/core/DatabaseCon.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/overlay/ClusterNodeStatus.h>
#include <ripple/app/misc/UniqueNodeList.h>
#include <ripple/basics/Log.h>
#include <ripple/protocol/digest.h>
@@ -228,8 +227,6 @@ private:
boost::posix_time::ptime mtpFetchNext; // Time of to start next fetch.
beast::DeadlineTimer m_fetchTimer; // Timer to start fetching.
std::map<RippleAddress, ClusterNodeStatus> m_clusterNodes;
std::string node_file_name_;
std::string node_file_path_;
@@ -271,17 +268,6 @@ public:
bool nodeInUNL (RippleAddress const& naNodePublic);
bool nodeInCluster (RippleAddress const& naNodePublic);
bool nodeInCluster (RippleAddress const& naNodePublic, std::string& name);
bool nodeUpdate (RippleAddress const& naNodePublic, ClusterNodeStatus const& cnsStatus);
std::map<RippleAddress, ClusterNodeStatus> getClusterStatus();
std::uint32_t getClusterFee();
void addClusterStatus (Json::Value& obj);
void nodeBootstrap();
bool nodeLoad (boost::filesystem::path pConfig);
@@ -654,105 +640,6 @@ bool UniqueNodeListImp::nodeInUNL (RippleAddress const& naNodePublic)
//--------------------------------------------------------------------------
bool UniqueNodeListImp::nodeInCluster (RippleAddress const& naNodePublic)
{
std::lock_guard <std::recursive_mutex> sl (mUNLLock);
return m_clusterNodes.end () != m_clusterNodes.find (naNodePublic);
}
//--------------------------------------------------------------------------
bool UniqueNodeListImp::nodeInCluster (RippleAddress const& naNodePublic, std::string& name)
{
std::lock_guard <std::recursive_mutex> sl (mUNLLock);
std::map<RippleAddress, ClusterNodeStatus>::iterator it = m_clusterNodes.find (naNodePublic);
if (it == m_clusterNodes.end ())
return false;
name = it->second.getName();
return true;
}
//--------------------------------------------------------------------------
bool UniqueNodeListImp::nodeUpdate (RippleAddress const& naNodePublic, ClusterNodeStatus const& cnsStatus)
{
std::lock_guard <std::recursive_mutex> sl (mUNLLock);
return m_clusterNodes[naNodePublic].update(cnsStatus);
}
//--------------------------------------------------------------------------
std::map<RippleAddress, ClusterNodeStatus>
UniqueNodeListImp::getClusterStatus()
{
std::map<RippleAddress, ClusterNodeStatus> ret;
{
std::lock_guard <std::recursive_mutex> sl (mUNLLock);
ret = m_clusterNodes;
}
return ret;
}
//--------------------------------------------------------------------------
std::uint32_t UniqueNodeListImp::getClusterFee()
{
auto const thresh = app_.timeKeeper().now().time_since_epoch().count() - 90;
std::vector<std::uint32_t> fees;
{
std::lock_guard <std::recursive_mutex> sl (mUNLLock);
{
for (std::map<RippleAddress, ClusterNodeStatus>::iterator it = m_clusterNodes.begin(),
end = m_clusterNodes.end(); it != end; ++it)
{
if (it->second.getReportTime() >= thresh)
fees.push_back(it->second.getLoadFee());
}
}
}
if (fees.empty())
return 0;
std::sort (fees.begin(), fees.end());
return fees[fees.size() / 2];
}
//--------------------------------------------------------------------------
void UniqueNodeListImp::addClusterStatus (Json::Value& obj)
{
std::lock_guard <std::recursive_mutex> sl (mUNLLock);
if (m_clusterNodes.size() > 1) // nodes other than us
{
auto const now = app_.timeKeeper().now().time_since_epoch().count();
std::uint32_t ref = app_.getFeeTrack().getLoadBase();
Json::Value& nodes = (obj[jss::cluster] = Json::objectValue);
for (std::map<RippleAddress, ClusterNodeStatus>::iterator it = m_clusterNodes.begin(),
end = m_clusterNodes.end(); it != end; ++it)
{
if (it->first != app_.getLocalCredentials().getNodePublic())
{
Json::Value& node = nodes[it->first.humanNodePublic()];
if (!it->second.getName().empty())
node["tag"] = it->second.getName();
if ((it->second.getLoadFee() != ref) && (it->second.getLoadFee() != 0))
node["fee"] = static_cast<double>(it->second.getLoadFee()) / ref;
if (it->second.getReportTime() != 0)
node["age"] = (it->second.getReportTime() >= now) ? 0 : (now - it->second.getReportTime());
}
}
}
}
//--------------------------------------------------------------------------
void UniqueNodeListImp::nodeBootstrap()
{
int iDomains = 0;
@@ -1017,22 +904,6 @@ bool UniqueNodeListImp::miscSave()
void UniqueNodeListImp::trustedLoad()
{
boost::regex rNode ("\\`\\s*(\\S+)[\\s]*(.*)\\'");
for (auto const& c : app_.config().CLUSTER_NODES)
{
boost::smatch match;
if (boost::regex_match (c, match, rNode))
{
RippleAddress a = RippleAddress::createNodePublic (match[1]);
if (a.isValid ())
m_clusterNodes.insert (std::make_pair (a, ClusterNodeStatus(match[2])));
}
else
JLOG (j_.warning) << "Entry in cluster list invalid: '" << c << "'";
}
auto db = app_.getWalletDB ().checkoutDb ();
std::lock_guard <std::recursive_mutex> slUNL (mUNLLock);

View File

@@ -21,7 +21,6 @@
#define RIPPLE_APP_PEERS_UNIQUENODELIST_H_INCLUDED
#include <ripple/app/main/Application.h>
#include <ripple/overlay/ClusterNodeStatus.h>
#include <ripple/protocol/PublicKey.h>
#include <ripple/protocol/RippleAddress.h>
#include <beast/threads/Stoppable.h>
@@ -69,12 +68,6 @@ public:
virtual void nodeScore () = 0;
virtual bool nodeInUNL (RippleAddress const& naNodePublic) = 0;
virtual bool nodeInCluster (RippleAddress const& naNodePublic) = 0;
virtual bool nodeInCluster (RippleAddress const& naNodePublic, std::string& name) = 0;
virtual bool nodeUpdate (RippleAddress const& naNodePublic, ClusterNodeStatus const& cnsStatus) = 0;
virtual std::map<RippleAddress, ClusterNodeStatus> getClusterStatus () = 0;
virtual std::uint32_t getClusterFee () = 0;
virtual void addClusterStatus (Json::Value&) = 0;
virtual void nodeBootstrap () = 0;
virtual bool nodeLoad (boost::filesystem::path pConfig) = 0;

View File

@@ -0,0 +1,115 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_CLUSTER_H_INCLUDED
#define RIPPLE_OVERLAY_CLUSTER_H_INCLUDED
#include <BeastConfig.h>
#include <ripple/app/main/Application.h>
#include <ripple/core/Config.h>
#include <ripple/overlay/ClusterNode.h>
#include <ripple/protocol/RippleAddress.h>
#include <beast/hash/uhash.h>
#include <beast/utility/Journal.h>
#include <functional>
#include <memory>
#include <mutex>
#include <set>
#include <type_traits>
namespace ripple {
class Cluster
{
private:
struct Comparator
{
using is_transparent = std::true_type;
bool
operator() (
ClusterNode const& lhs,
ClusterNode const& rhs) const
{
return lhs.identity() < rhs.identity();
}
bool
operator() (
ClusterNode const& lhs,
RippleAddress const& rhs) const
{
return lhs.identity() < rhs;
}
bool
operator() (
RippleAddress const& lhs,
ClusterNode const& rhs) const
{
return lhs < rhs.identity();
}
};
std::set<ClusterNode, Comparator> nodes_;
std::mutex mutable mutex_;
beast::Journal mutable j_;
public:
Cluster (beast::Journal j);
/** Determines whether a node belongs in the cluster
@return empty optional if the node isn't a member,
otherwise, the node's name (which may be
empty).
*/
boost::optional<std::string>
member (RippleAddress const& node) const;
/** The number of nodes in the cluster list. */
std::size_t
size() const;
/** Store information about the state of a cluster node.
@param identity The node's public identity
@param name The node's name (may be empty)
@return true if we updated our information
*/
bool
update (
RippleAddress const& identity,
std::string name,
std::uint32_t loadFee = 0,
std::uint32_t reportTime = 0);
/** Invokes the callback once for every cluster node.
@note You are not allowed to call `update` from
within the callback.
*/
void
for_each (
std::function<void(ClusterNode const&)> func) const;
};
std::unique_ptr<Cluster>
make_Cluster (Config const& config, beast::Journal j);
} // ripple
#endif

View File

@@ -20,59 +20,54 @@
#ifndef RIPPLE_APP_PEERS_CLUSTERNODESTATUS_H_INCLUDED
#define RIPPLE_APP_PEERS_CLUSTERNODESTATUS_H_INCLUDED
#include <ripple/protocol/RippleAddress.h>
#include <cstdint>
#include <string>
namespace ripple {
class ClusterNodeStatus
class ClusterNode
{
public:
ClusterNode() = delete;
ClusterNodeStatus() : mLoadFee(0), mReportTime(0)
{ ; }
ClusterNode(
RippleAddress const& identity,
std::string const& name,
std::uint32_t fee = 0,
std::uint32_t rtime = 0)
: identity_ (identity)
, name_(name)
, mLoadFee(fee)
, mReportTime(rtime)
{ }
explicit ClusterNodeStatus(std::string const& name) :
mNodeName(name), mLoadFee(0), mReportTime(0)
{ ; }
ClusterNodeStatus(
std::string const& name, std::uint32_t fee, std::uint32_t rtime) :
mNodeName(name),
mLoadFee(fee),
mReportTime(rtime)
{ ; }
std::string const& getName()
std::string const& name() const
{
return mNodeName;
return name_;
}
std::uint32_t getLoadFee()
std::uint32_t getLoadFee() const
{
return mLoadFee;
}
std::uint32_t getReportTime()
std::uint32_t getReportTime() const
{
return mReportTime;
}
bool update(ClusterNodeStatus const& status)
RippleAddress const&
identity () const
{
if (status.mReportTime <= mReportTime)
return false;
mLoadFee = status.mLoadFee;
mReportTime = status.mReportTime;
if (mNodeName.empty() || !status.mNodeName.empty())
mNodeName = status.mNodeName;
return true;
return identity_;
}
private:
std::string mNodeName;
std::uint32_t mLoadFee;
std::uint32_t mReportTime;
RippleAddress identity_;
std::string name_;
std::uint32_t mLoadFee = 0;
std::uint32_t mReportTime = 0;
};
} // ripple

View File

@@ -0,0 +1,143 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/app/main/Application.h>
#include <ripple/basics/Log.h>
#include <ripple/core/Config.h>
#include <ripple/core/TimeKeeper.h>
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/ClusterNode.h>
#include <ripple/protocol/JsonFields.h>
#include <ripple/protocol/RippleAddress.h>
#include <ripple/protocol/tokens.h>
#include <boost/regex.hpp>
#include <memory.h>
namespace ripple {
Cluster::Cluster (beast::Journal j)
: j_ (j)
{
}
boost::optional<std::string>
Cluster::member (RippleAddress const& identity) const
{
std::lock_guard<std::mutex> lock(mutex_);
auto iter = nodes_.find (identity);
if (iter == nodes_.end ())
return boost::none;
return iter->name ();
}
std::size_t
Cluster::size() const
{
std::lock_guard<std::mutex> lock(mutex_);
return nodes_.size();
}
bool
Cluster::update (
RippleAddress const& identity,
std::string name,
std::uint32_t loadFee,
std::uint32_t reportTime)
{
std::lock_guard<std::mutex> lock(mutex_);
// We can't use auto here yet due to the libstdc++ issue
// described at https://gcc.gnu.org/bugzilla/show_bug.cgi?id=68190
std::set<ClusterNode, Comparator>::iterator iter =
nodes_.find (identity);
if (iter != nodes_.end ())
{
if (reportTime <= iter->getReportTime())
return false;
if (name.empty())
name = iter->name();
iter = nodes_.erase (iter);
}
nodes_.emplace_hint (iter, identity, name, loadFee, reportTime);
return true;
}
void
Cluster::for_each (
std::function<void(ClusterNode const&)> func) const
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto const& ni : nodes_)
func (ni);
}
std::unique_ptr<Cluster>
make_Cluster (Config const& config, beast::Journal j)
{
static boost::regex const re (
"^" // start of line
"(?:\\s*)" // whitespace (optional)
"([a-zA-Z0-9]*)" // Node identity
"(?:\\s*)" // whitespace (optional)
"(.*\\S*)" // <value>
"(?:\\s*)" // whitespace (optional)
);
auto cluster = std::make_unique<Cluster> (j);
for (auto const& n : config.CLUSTER_NODES)
{
boost::smatch match;
if (!boost::regex_match (n, match, re))
{
JLOG (j.error) <<
"Malformed entry: '" << n << "'";
continue;
}
auto const nid = RippleAddress::createNodePublic (match[1]);
if (!nid.isValid())
{
JLOG (j.error) <<
"Invalid node identity: " << match[1];
continue;
}
if (cluster->member (nid))
{
JLOG (j.warning) <<
"Duplicate node identity: " << match[1];
continue;
}
cluster->update(nid, match[2]);
}
return cluster;
}
} // ripple

View File

@@ -18,6 +18,7 @@
//==============================================================================
#include <BeastConfig.h>
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/impl/ConnectAttempt.h>
#include <ripple/overlay/impl/PeerImp.h>
#include <ripple/overlay/impl/Tuning.h>
@@ -417,15 +418,13 @@ ConnectAttempt::processResponse (beast::http::message const& m,
if(journal_.info) journal_.info <<
"Protocol: " << to_string(protocol);
std::string name;
bool const clusterNode =
app_.getUNL().nodeInCluster(publicKey, name);
if (clusterNode)
auto member = app_.cluster().member(publicKey);
if (member)
if (journal_.info) journal_.info <<
"Cluster name: " << name;
"Cluster name: " << *member;
auto const result = overlay_.peerFinder().activate (slot_,
publicKey.toPublicKey(), clusterNode);
publicKey.toPublicKey(), static_cast<bool>(member));
if (result != PeerFinder::Result::success)
return fail("Outbound slots full");

View File

@@ -24,6 +24,7 @@
#include <ripple/basics/make_SSLContext.h>
#include <ripple/protocol/JsonFields.h>
#include <ripple/server/JsonWriter.h>
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/impl/ConnectAttempt.h>
#include <ripple/overlay/impl/OverlayImpl.h>
#include <ripple/overlay/impl/PeerImp.h>
@@ -252,12 +253,9 @@ OverlayImpl::onHandoff (std::unique_ptr <beast::asio::ssl_bundle>&& ssl_bundle,
if(! success)
return handoff;
std::string name;
bool const cluster = app_.getUNL().nodeInCluster(
publicKey, name);
auto const result = m_peerFinder->activate (slot,
publicKey.toPublicKey(), cluster);
publicKey.toPublicKey(),
static_cast<bool>(app_.cluster().member(publicKey)));
if (result != PeerFinder::Result::success)
{
m_peerFinder->on_closed(slot);

View File

@@ -27,7 +27,6 @@
#include <ripple/app/misc/HashRouter.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/app/misc/Transaction.h>
#include <ripple/overlay/ClusterNodeStatus.h>
#include <ripple/app/misc/UniqueNodeList.h>
#include <ripple/app/tx/apply.h>
#include <ripple/protocol/digest.h>
@@ -38,6 +37,8 @@
#include <ripple/json/json_reader.h>
#include <ripple/resource/Fees.h>
#include <ripple/server/ServerHandler.h>
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/ClusterNode.h>
#include <ripple/protocol/BuildInfo.h>
#include <ripple/protocol/JsonFields.h>
#include <beast/module/core/diagnostic/SemanticVersion.h>
@@ -613,10 +614,13 @@ void PeerImp::doAccept()
"Protocol: " << to_string(protocol);
if(journal_.info) journal_.info <<
"Public Key: " << publicKey_.humanNodePublic();
bool const cluster = app_.getUNL().nodeInCluster(publicKey_, name_);
if (cluster)
if (auto member = app_.cluster().member(publicKey_))
{
name_ = *member;
if (journal_.info) journal_.info <<
"Cluster name: " << name_;
}
overlay_.activate(shared_from_this());
@@ -912,12 +916,12 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMCluster> const& m)
std::string name;
if (node.has_nodename())
name = node.nodename();
ClusterNodeStatus s(name, node.nodeload(), node.reporttime());
RippleAddress nodePub;
nodePub.setNodePublic(node.publickey());
app_.getUNL().nodeUpdate(nodePub, s);
app_.cluster().update(
RippleAddress::createNodePublic(node.publickey()),
name,
node.nodeload(),
node.reporttime());
}
int loadSources = m->loadsources().size();
@@ -937,7 +941,31 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMCluster> const& m)
overlay_.resourceManager().importConsumers (name_, gossip);
}
app_.getFeeTrack().setClusterFee(app_.getUNL().getClusterFee());
// Calculate the cluster fee:
auto const thresh = app_.timeKeeper().now().time_since_epoch().count() - 90;
std::uint32_t clusterFee = 0;
std::vector<std::uint32_t> fees;
fees.reserve (app_.cluster().size());
app_.cluster().for_each(
[&fees,thresh](ClusterNode const& status)
{
if (status.getReportTime() >= thresh)
fees.push_back (status.getLoadFee ());
});
if (!fees.empty())
{
auto const index = fees.size() / 2;
std::nth_element (
fees.begin(),
fees.begin () + index,
fees.end());
clusterFee = fees[index];
}
app_.getFeeTrack().setClusterFee(clusterFee);
}
void

View File

@@ -0,0 +1,187 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright 2015 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/basics/TestSuite.h>
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/ClusterNode.h>
namespace ripple {
namespace tests {
class cluster_test : public ripple::TestSuite
{
public:
std::unique_ptr<Cluster>
make_Cluster (std::vector<RippleAddress> const& nodes)
{
auto cluster = std::make_unique <Cluster> (beast::Journal ());
for (auto const& n : nodes)
cluster->update (n, n.humanNodePublic());
return cluster;
}
RippleAddress
randomNode ()
{
return RippleAddress::createNodePublic (
RippleAddress::createSeedRandom ());
}
void
testMembership ()
{
// The servers on the network
std::vector<RippleAddress> network;
while (network.size () != 128)
network.push_back (randomNode());
{
testcase ("Membership: Empty cluster");
auto c = make_Cluster ({});
for (auto const& n : network)
expect (!c->member (n));
}
{
testcase ("Membership: Non-empty cluster and none present");
std::vector<RippleAddress> cluster;
while (cluster.size () != 32)
cluster.push_back (randomNode());
auto c = make_Cluster (cluster);
for (auto const& n : network)
expect (!c->member (n));
}
{
testcase ("Membership: Non-empty cluster and some present");
std::vector<RippleAddress> cluster (
network.begin (), network.begin () + 16);
while (cluster.size () != 32)
cluster.push_back (randomNode());
auto c = make_Cluster (cluster);
for (auto const& n : cluster)
expect (c->member (n));
for (auto const& n : network)
{
auto found = std::find (
cluster.begin (), cluster.end (), n);
expect (static_cast<bool>(c->member (n)) ==
(found != cluster.end ()));
}
}
{
testcase ("Membership: Non-empty cluster and all present");
std::vector<RippleAddress> cluster (
network.begin (), network.begin () + 32);
auto c = make_Cluster (cluster);
for (auto const& n : cluster)
expect (c->member (n));
for (auto const& n : network)
{
auto found = std::find (
cluster.begin (), cluster.end (), n);
expect (static_cast<bool>(c->member (n)) ==
(found != cluster.end ()));
}
}
}
void
testUpdating ()
{
testcase ("Updating");
auto c = make_Cluster ({});
auto const node = randomNode ();
std::uint32_t load = 0;
std::uint32_t tick = 0;
// Initial update
expect (c->update (node, "", load, tick));
{
auto member = c->member (node);
expect (static_cast<bool>(member));
expect (member->empty ());
}
// Updating too quickly: should fail
expect (! c->update (node, node.humanNodePublic (), load, tick));
{
auto member = c->member (node);
expect (static_cast<bool>(member));
expect (member->empty ());
}
// Updating the name (empty updates to non-empty)
expect (c->update (node, node.humanNodePublic (), load, ++tick));
{
auto member = c->member (node);
expect (static_cast<bool>(member));
expect (member->compare(node.humanNodePublic ()) == 0);
}
// Updating the name (non-empty doesn't go to empty)
expect (c->update (node, "", load, ++tick));
{
auto member = c->member (node);
expect (static_cast<bool>(member));
expect (member->compare(node.humanNodePublic ()) == 0);
}
// Updating the name (non-empty updates to new non-empty)
expect (c->update (node, "test", load, ++tick));
{
auto member = c->member (node);
expect (static_cast<bool>(member));
expect (member->compare("test") == 0);
}
}
void
run() override
{
testMembership ();
testUpdating ();
}
};
BEAST_DEFINE_TESTSUITE(cluster,overlay,ripple);
} // tests
} // ripple

View File

@@ -74,7 +74,7 @@ JSS ( accounts_proposed ); // in: Subscribe, Unsubscribe
JSS ( action );
JSS ( address ); // out: PeerImp
JSS ( affected ); // out: AcceptedLedgerTx
JSS ( age ); // out: UniqueNodeList, NetworkOPs
JSS ( age ); // out: UniqueNodeList, NetworkOPs, Peers
JSS ( alternatives ); // out: PathRequest, RipplePathFind
JSS ( amendment_blocked ); // out: NetworkOPs
JSS ( asks ); // out: Subscribe
@@ -154,6 +154,7 @@ JSS ( fail_hard ); // in: Sign, Submit
JSS ( failed ); // out: InboundLedger
JSS ( feature ); // in: Feature
JSS ( features ); // out: Feature
JSS ( fee ); // out: NetworkOPs, Peers
JSS ( fee_base ); // out: NetworkOPs
JSS ( fee_mult_max ); // in: TransactionSign
JSS ( fee_ref ); // out: NetworkOPs
@@ -351,6 +352,7 @@ JSS ( subcommand ); // in: PathFind
JSS ( success ); // rpc
JSS ( supported ); // out: AmendmentTableImpl
JSS ( system_time_offset ); // out: NetworkOPs
JSS ( tag ); // out: Peers
JSS ( taker ); // in: Subscribe, BookOffers
JSS ( taker_gets ); // in: Subscribe, Unsubscribe, BookOffers
JSS ( taker_gets_funded ); // out: NetworkOPs

View File

@@ -19,7 +19,10 @@
#include <BeastConfig.h>
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/UniqueNodeList.h>
#include <ripple/app/main/LocalCredentials.h>
#include <ripple/core/LoadFeeTrack.h>
#include <ripple/core/TimeKeeper.h>
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/protocol/JsonFields.h>
#include <ripple/rpc/Context.h>
@@ -36,7 +39,32 @@ Json::Value doPeers (RPC::Context& context)
auto lock = beast::make_lock(context.app.getMasterMutex());
jvResult[jss::peers] = context.app.overlay ().json ();
context.app.getUNL().addClusterStatus(jvResult);
auto const now = context.app.timeKeeper().now().time_since_epoch().count();
auto const self = context.app.getLocalCredentials().getNodePublic();
Json::Value& cluster = (jvResult[jss::cluster] = Json::objectValue);
std::uint32_t ref = context.app.getFeeTrack().getLoadBase();
context.app.cluster().for_each ([&cluster, now, ref, &self]
(ClusterNode const& node)
{
if (node.identity() == self)
return;
Json::Value& json = cluster[node.identity().humanNodePublic()];
if (!node.name().empty())
json[jss::tag] = node.name();
if ((node.getLoadFee() != ref) && (node.getLoadFee() != 0))
json[jss::fee] = static_cast<double>(node.getLoadFee()) / ref;
if (node.getReportTime())
json[jss::age] = (node.getReportTime() >= now)
? 0
: (now - node.getReportTime());
});
}
return jvResult;

View File

@@ -20,6 +20,7 @@
#include <BeastConfig.h>
#include <ripple/overlay/impl/ConnectAttempt.cpp>
#include <ripple/overlay/impl/Cluster.cpp>
#include <ripple/overlay/impl/Manifest.cpp>
#include <ripple/overlay/impl/Message.cpp>
#include <ripple/overlay/impl/OverlayImpl.cpp>
@@ -28,6 +29,7 @@
#include <ripple/overlay/impl/TMHello.cpp>
#include <ripple/overlay/impl/TrafficCount.cpp>
#include <ripple/overlay/tests/cluster_test.cpp>
#include <ripple/overlay/tests/manifest_test.cpp>
#include <ripple/overlay/tests/short_read.test.cpp>
#include <ripple/overlay/tests/TMHello.test.cpp>