Use cluster state in Slot instead of PeerImp

This commit is contained in:
Vinnie Falco
2014-11-24 07:11:09 -08:00
committed by Nik Bougalis
parent cd8ec89cbb
commit fb0d44d403
5 changed files with 41 additions and 51 deletions

View File

@@ -72,6 +72,11 @@ public:
id_t
id() const = 0;
/** Returns `true` if this connection is a member of the cluster. */
virtual
bool
cluster() const = 0;
virtual
RippleAddress const&
getNodePublic() const = 0;
@@ -79,16 +84,6 @@ public:
virtual
Json::Value json() = 0;
// VFALCO TODO Replace both with
// boost::optional<std::string> const& cluster_id();
virtual
bool
isInCluster() const = 0;
virtual
std::string const&
getClusterNodeName() const = 0;
//
// Ledger
//

View File

@@ -241,15 +241,18 @@ OverlayImpl::onHandoff (std::unique_ptr <beast::asio::ssl_bundle>&& ssl_bundle,
handoff.moved = true;
bool success = true;
protocol::TMHello hello;
std::tie(hello, success) = parseHello (request, journal_);
if(! success)
return handoff;
uint256 sharedValue;
std::tie(sharedValue, success) = makeSharedValue(
ssl_bundle->stream.native_handle(), journal_);
if(! success)
return handoff;
RippleAddress publicKey;
std::tie(publicKey, success) = verifyHello (hello,
sharedValue, journal_, getApp());
@@ -257,11 +260,11 @@ OverlayImpl::onHandoff (std::unique_ptr <beast::asio::ssl_bundle>&& ssl_bundle,
return handoff;
std::string name;
bool clusterNode = getApp().getUNL().nodeInCluster(
bool const cluster = getApp().getUNL().nodeInCluster(
publicKey, name);
auto const result = m_peerFinder->activate (slot,
RipplePublicKey(publicKey), clusterNode);
auto const result = m_peerFinder->activate (slot,
RipplePublicKey(publicKey), cluster);
if (result != PeerFinder::Result::success)
{
if (journal_.trace) journal_.trace <<
@@ -352,7 +355,7 @@ OverlayImpl::connect (beast::IP::Endpoint const& remote_endpoint)
}
}
//--------------------------------------------------------------------------
//------------------------------------------------------------------------------
void
OverlayImpl::remove (PeerFinder::Slot::ptr const& slot)
@@ -363,11 +366,11 @@ OverlayImpl::remove (PeerFinder::Slot::ptr const& slot)
m_peers.erase (iter);
}
//--------------------------------------------------------------------------
//------------------------------------------------------------------------------
//
// Stoppable
//
//--------------------------------------------------------------------------
//------------------------------------------------------------------------------
// Caller must hold the mutex
void
@@ -464,18 +467,18 @@ OverlayImpl::onChildrenStopped ()
checkStopped ();
}
//--------------------------------------------------------------------------
//------------------------------------------------------------------------------
//
// PropertyStream
//
//--------------------------------------------------------------------------
//------------------------------------------------------------------------------
void
OverlayImpl::onWrite (beast::PropertyStream::Map& stream)
{
}
//--------------------------------------------------------------------------
//------------------------------------------------------------------------------
/** A peer has connected successfully
This is called after the peer handshake has been completed and during
peer activation. At this point, the peer address and the public key

View File

@@ -93,7 +93,7 @@ PeerImp::PeerImp (id_t id, beast::IP::Endpoint remoteAddress,
PeerImp::~PeerImp ()
{
if (clusterNode_)
if (cluster())
if (journal_.warning) journal_.warning <<
name_ << " left cluster";
if (state_ == State::active)
@@ -192,7 +192,7 @@ PeerImp::json()
if (m_inbound)
ret["inbound"] = true;
if (clusterNode_)
if (cluster())
{
ret["cluster"] = true;
@@ -708,13 +708,13 @@ PeerImp::processResponse (beast::http::message const& m,
"Protocol: " << to_string(protocol);
if(journal_.info) journal_.info <<
"Public Key: " << publicKey_.humanNodePublic();
clusterNode_ = getApp().getUNL().nodeInCluster(publicKey_, name_);
if (clusterNode_)
bool const cluster = getApp().getUNL().nodeInCluster(publicKey_, name_);
if (cluster)
if (journal_.info) journal_.info <<
"Cluster name: " << name_;
auto const result = overlay_.peerFinder().activate (slot_,
RipplePublicKey(publicKey_), clusterNode_);
auto const result = overlay_.peerFinder().activate (
slot_, RipplePublicKey(publicKey_), cluster);
if (result != PeerFinder::Result::success)
return fail("Outbound slots full");
@@ -787,8 +787,8 @@ void PeerImp::doAccept()
"Protocol: " << to_string(protocol);
if(journal_.info) journal_.info <<
"Public Key: " << publicKey_.humanNodePublic();
clusterNode_ = getApp().getUNL().nodeInCluster(publicKey_, name_);
if (clusterNode_)
bool const cluster = getApp().getUNL().nodeInCluster(publicKey_, name_);
if (cluster)
if (journal_.info) journal_.info <<
"Cluster name: " << name_;
@@ -1075,8 +1075,8 @@ PeerImp::on_message (std::shared_ptr <protocol::TMHello> const& m)
"Protocol: " << to_string(protocol);
if(journal_.info) journal_.info <<
"Public Key: " << publicKey_.humanNodePublic();
clusterNode_ = getApp().getUNL().nodeInCluster(publicKey_, name_);
if (clusterNode_)
bool const cluster = getApp().getUNL().nodeInCluster(publicKey_, name_);
if (cluster)
if (journal_.info) journal_.info <<
"Cluster name: " << name_;
@@ -1086,7 +1086,7 @@ PeerImp::on_message (std::shared_ptr <protocol::TMHello> const& m)
hello_ = *m;
auto const result = overlay_.peerFinder().activate (slot_,
RipplePublicKey (publicKey_), clusterNode_);
RipplePublicKey (publicKey_), cluster);
if (result == PeerFinder::Result::success)
{
@@ -1236,7 +1236,8 @@ PeerImp::error_code
PeerImp::on_message (std::shared_ptr <protocol::TMCluster> const& m)
{
error_code ec;
if (!clusterNode_)
// VFALCO NOTE I think we should drop the peer immediately
if (! cluster())
{
charge (Resource::feeUnwantedData);
return ec;
@@ -1387,7 +1388,7 @@ PeerImp::on_message (std::shared_ptr <protocol::TMTransaction> const& m)
p_journal_.debug <<
"Got tx " << txID;
if (clusterNode_)
if (cluster())
{
if (! m->has_deferred () || ! m->deferred ())
{
@@ -2053,7 +2054,7 @@ PeerImp::checkPropose (Job& job,
"proposal with previous ledger";
memcpy (prevLedger.begin (), set.previousledger ().data (), 256 / 8);
if (!clusterNode_ && !proposal->checkSign (set.signature ()))
if (! cluster() && !proposal->checkSign (set.signature ()))
{
p_journal_.warning <<
"Proposal with previous ledger fails sig check";
@@ -2114,7 +2115,7 @@ PeerImp::checkValidation (Job&, STValidation::pointer val,
{
// VFALCO Which functions throw?
uint256 signingHash = val->getSigningHash();
if (!clusterNode_ && !val->isValid (signingHash))
if (! cluster() && !val->isValid (signingHash))
{
p_journal_.warning <<
"Validation is invalid";
@@ -2248,7 +2249,7 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
}
else
{
if (getApp().getFeeTrack().isLoadedLocal() && !clusterNode_)
if (getApp().getFeeTrack().isLoadedLocal() && ! cluster())
{
p_journal_.debug <<
"GetLedger: Too busy";

View File

@@ -123,9 +123,6 @@ private:
State state_; // Current state
bool detaching_ = false;
// True if peer is a node in our cluster
bool clusterNode_ = false;
// Node public key of peer.
RippleAddress publicKey_;
@@ -239,6 +236,12 @@ public:
return id_;
}
bool
cluster() const override
{
return slot_->cluster();
}
RippleAddress const&
getNodePublic () const override
{
@@ -248,18 +251,6 @@ public:
Json::Value
json() override;
bool
isInCluster () const override
{
return clusterNode_;
}
std::string const&
getClusterNodeName() const override
{
return name_;
}
//
// Ledger
//

View File

@@ -142,7 +142,7 @@ struct peer_in_cluster
if (skipPeer (peer))
return false;
if (!peer->isInCluster ())
if (! peer->cluster())
return false;
return true;