Remove tracking by public key in Overlay

This commit is contained in:
Nik Bougalis
2016-02-11 09:28:44 -08:00
parent 78ce7a08c0
commit d321b446db
3 changed files with 81 additions and 126 deletions

View File

@@ -273,7 +273,14 @@ OverlayImpl::onHandoff (std::unique_ptr <beast::asio::ssl_bundle>&& ssl_bundle,
// while holding the lock, otherwise new I/O can be
// queued after a call to stop().
std::lock_guard <decltype(mutex_)> lock (mutex_);
add(peer);
{
auto const result =
m_peers.emplace (peer->slot(), peer);
assert (result.second);
(void) result.second;
}
list_.emplace(peer.get(), peer);
peer->run();
}
handoff.moved = true;
@@ -378,19 +385,11 @@ OverlayImpl::add_active (std::shared_ptr<PeerImp> const& peer)
(void) result.second;
}
// Now track this peer
{
auto const result (m_shortIdMap.emplace (
auto const result = ids_.emplace (
std::piecewise_construct,
std::make_tuple (peer->id()),
std::make_tuple (peer)));
assert(result.second);
(void) result.second;
}
{
auto const result (m_publicKeyMap.emplace(
peer->getNodePublic(), peer));
std::make_tuple (peer));
assert(result.second);
(void) result.second;
}
@@ -622,11 +621,10 @@ OverlayImpl::onWrite (beast::PropertyStream::Map& stream)
void
OverlayImpl::activate (std::shared_ptr<PeerImp> const& peer)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
// Now track this peer
{
auto const result (m_shortIdMap.emplace (
std::lock_guard <decltype(mutex_)> lock (mutex_);
auto const result (ids_.emplace (
std::piecewise_construct,
std::make_tuple (peer->id()),
std::make_tuple (peer)));
@@ -634,13 +632,6 @@ OverlayImpl::activate (std::shared_ptr<PeerImp> const& peer)
(void) result.second;
}
{
auto const result (m_publicKeyMap.emplace(
peer->getNodePublic(), peer));
assert(result.second);
(void) result.second;
}
journal_.debug <<
"activated " << peer->getRemoteAddress() <<
" (" << peer->id() <<
@@ -653,12 +644,10 @@ OverlayImpl::activate (std::shared_ptr<PeerImp> const& peer)
}
void
OverlayImpl::onPeerDeactivate (Peer::id_t id,
PublicKey const& publicKey)
OverlayImpl::onPeerDeactivate (Peer::id_t id)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
m_shortIdMap.erase(id);
m_publicKeyMap.erase(publicKey);
ids_.erase(id);
}
void
@@ -756,25 +745,28 @@ OverlayImpl::selectPeers (PeerSet& set, std::size_t limit,
std::function<bool(std::shared_ptr<Peer> const&)> score)
{
using item = std::pair<int, std::shared_ptr<PeerImp>>;
std::vector<item> v;
v.reserve(size());
for_each ([&](std::shared_ptr<PeerImp>&& e)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
v.reserve(m_publicKeyMap.size());
for_each_unlocked ([&](std::shared_ptr<PeerImp> && e)
{
auto const s = e->getScore(score(e));
v.emplace_back(s, std::move(e));
});
}
std::sort(v.begin(), v.end(),
[](item const& lhs, item const&rhs)
{
return lhs.first > rhs.first;
auto const s = e->getScore(score(e));
v.emplace_back(s, std::move(e));
});
std::sort(v.begin(), v.end(),
[](item const& lhs, item const&rhs)
{
return lhs.first > rhs.first;
});
std::size_t accepted = 0;
for (auto const& e : v)
{
if (set.insert(e.second) && ++accepted >= limit)
break;
}
return accepted;
}
@@ -786,7 +778,7 @@ std::size_t
OverlayImpl::size()
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
return m_publicKeyMap.size ();
return ids_.size ();
}
Json::Value
@@ -795,39 +787,37 @@ OverlayImpl::crawl()
using namespace std::chrono;
Json::Value jv;
auto& av = jv["active"] = Json::Value(Json::arrayValue);
std::lock_guard <decltype(mutex_)> lock (mutex_);
for (auto const& e : m_publicKeyMap)
for_each ([&](std::shared_ptr<PeerImp>&& sp)
{
if (auto const sp = e.second.lock())
auto& pv = av.append(Json::Value(Json::objectValue));
pv[jss::public_key] = beast::base64_encode(
sp->getNodePublic().data(),
sp->getNodePublic().size());
pv[jss::type] = sp->slot()->inbound() ?
"in" : "out";
pv[jss::uptime] =
static_cast<std::uint32_t>(duration_cast<seconds>(
sp->uptime()).count());
if (sp->crawl())
{
auto& pv = av.append(Json::Value(Json::objectValue));
pv[jss::public_key] = beast::base64_encode(
sp->getNodePublic().data(),
sp->getNodePublic().size());
pv[jss::type] = sp->slot()->inbound() ?
"in" : "out";
pv[jss::uptime] =
static_cast<std::uint32_t>(duration_cast<seconds>(
sp->uptime()).count());
if (sp->crawl())
pv[jss::ip] = sp->getRemoteAddress().address().to_string();
if (sp->slot()->inbound())
{
pv[jss::ip] = sp->getRemoteAddress().address().to_string();
if (sp->slot()->inbound())
{
if (auto port = sp->slot()->listening_port())
pv[jss::port] = *port;
}
else
{
pv[jss::port] = std::to_string(
sp->getRemoteAddress().port());
}
if (auto port = sp->slot()->listening_port())
pv[jss::port] = *port;
}
else
{
pv[jss::port] = std::to_string(
sp->getRemoteAddress().port());
}
auto version = sp->getVersion ();
if (!version.empty ())
pv["version"] = version;
}
}
auto version = sp->getVersion ();
if (!version.empty ())
pv["version"] = version;
});
return jv;
}
@@ -859,14 +849,12 @@ Overlay::PeerSequence
OverlayImpl::getActivePeers()
{
Overlay::PeerSequence ret;
std::lock_guard <decltype(mutex_)> lock (mutex_);
ret.reserve (m_publicKeyMap.size ());
for (auto const& e : m_publicKeyMap)
ret.reserve(size());
for_each ([&ret](std::shared_ptr<PeerImp>&& sp)
{
auto const sp = e.second.lock();
if (sp)
ret.push_back(sp);
}
ret.emplace_back(std::move(sp));
});
return ret;
}
@@ -874,33 +862,27 @@ OverlayImpl::getActivePeers()
void
OverlayImpl::checkSanity (std::uint32_t index)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
for (auto const& e : m_publicKeyMap)
for_each ([index](std::shared_ptr<PeerImp>&& sp)
{
if (auto const sp = e.second.lock())
sp->checkSanity (index);
}
sp->checkSanity (index);
});
}
void
OverlayImpl::check ()
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
for (auto const& e : m_publicKeyMap)
for_each ([](std::shared_ptr<PeerImp>&& sp)
{
if (auto const sp = e.second.lock())
sp->check ();
}
sp->check ();
});
}
Peer::ptr
OverlayImpl::findPeerByShortID (Peer::id_t const& id)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
auto const iter = m_shortIdMap.find (id);
if (iter != m_shortIdMap.end ())
auto const iter = ids_.find (id);
if (iter != ids_.end ())
return iter->second.lock();
return Peer::ptr();
}
@@ -912,7 +894,7 @@ OverlayImpl::send (protocol::TMProposeSet& m)
m.set_hops(0);
auto const sm = std::make_shared<Message>(
m, protocol::mtPROPOSE_LEDGER);
for_each([&](std::shared_ptr<PeerImp> const& p)
for_each([&](std::shared_ptr<PeerImp>&& p)
{
if (! m.has_hops() || p->hopsAware())
p->send(sm);
@@ -925,7 +907,7 @@ OverlayImpl::send (protocol::TMValidation& m)
m.set_hops(0);
auto const sm = std::make_shared<Message>(
m, protocol::mtVALIDATION);
for_each([&](std::shared_ptr<PeerImp> const& p)
for_each([&](std::shared_ptr<PeerImp>&& p)
{
if (! m.has_hops() || p->hopsAware())
p->send(sm);
@@ -944,7 +926,7 @@ OverlayImpl::relay (protocol::TMProposeSet& m,
return;
auto const sm = std::make_shared<Message>(
m, protocol::mtPROPOSE_LEDGER);
for_each([&](std::shared_ptr<PeerImp> const& p)
for_each([&](std::shared_ptr<PeerImp>&& p)
{
if (skip.find(p->id()) != skip.end())
return;
@@ -965,7 +947,7 @@ OverlayImpl::relay (protocol::TMValidation& m,
return;
auto const sm = std::make_shared<Message>(
m, protocol::mtVALIDATION);
for_each([&](std::shared_ptr<PeerImp> const& p)
for_each([&](std::shared_ptr<PeerImp>&& p)
{
if (skip.find(p->id()) != skip.end())
return;
@@ -976,18 +958,6 @@ OverlayImpl::relay (protocol::TMValidation& m,
//------------------------------------------------------------------------------
void
OverlayImpl::add (std::shared_ptr<PeerImp> const& peer)
{
{
auto const result =
m_peers.emplace (peer->slot(), peer);
assert (result.second);
(void) result.second;
}
list_.emplace(peer.get(), peer);
}
void
OverlayImpl::remove (Child& child)
{

View File

@@ -115,8 +115,7 @@ private:
TrafficCount m_traffic;
hash_map <PeerFinder::Slot::ptr,
std::weak_ptr <PeerImp>> m_peers;
hash_map<PublicKey, std::weak_ptr<PeerImp>> m_publicKeyMap;
hash_map<Peer::id_t, std::weak_ptr<PeerImp>> m_shortIdMap;
hash_map<Peer::id_t, std::weak_ptr<PeerImp>> ids_;
Resolver& m_resolver;
std::atomic <Peer::id_t> next_id_;
ManifestCache manifestCache_;
@@ -226,29 +225,21 @@ public:
// Called when an active peer is destroyed.
void
onPeerDeactivate (Peer::id_t id, PublicKey const& publicKey);
onPeerDeactivate (Peer::id_t id);
// UnaryFunc will be called as
// void(std::shared_ptr<PeerImp>&&)
//
template <class UnaryFunc>
void
for_each_unlocked (UnaryFunc&& f)
{
for (auto const& e : m_publicKeyMap)
{
auto sp = e.second.lock();
if (sp)
f(std::move(sp));
}
}
template <class UnaryFunc>
void
for_each (UnaryFunc&& f)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
for_each_unlocked(f);
for (auto const& e : ids_)
{
if (auto sp = e.second.lock())
f(std::move(sp));
}
}
std::size_t
@@ -331,9 +322,6 @@ private:
//--------------------------------------------------------------------------
void
add (std::shared_ptr<PeerImp> const& peer);
void
remove (Child& child);

View File

@@ -99,10 +99,7 @@ PeerImp::~PeerImp ()
if (journal_.warning) journal_.warning <<
name_ << " left cluster";
if (state_ == State::active)
{
assert(publicKey_.size() != 0);
overlay_.onPeerDeactivate(id_, publicKey_);
}
overlay_.onPeerDeactivate(id_);
overlay_.peerFinder().on_closed (slot_);
overlay_.remove (slot_);
}