Add RPC command shard crawl (RIPD-1663)

This commit is contained in:
Miguel Portilla
2018-07-10 11:11:29 -04:00
committed by seelabs
parent 86c066cd7e
commit 3661dc88fe
17 changed files with 529 additions and 72 deletions

View File

@@ -707,6 +707,102 @@ OverlayImpl::reportTraffic (
m_traffic.addCount (cat, isInbound, number);
}
Json::Value
OverlayImpl::crawlShards(bool pubKey, std::uint32_t hops)
{
using namespace std::chrono;
using namespace std::chrono_literals;
Json::Value jv(Json::objectValue);
auto const numPeers {size()};
if (numPeers == 0)
return jv;
// If greater than a hop away, we may need to gather or freshen data
if (hops > 0)
{
// Prevent crawl spamming
clock_type::time_point const last(csLast_.load());
if ((clock_type::now() - last) > 60s)
{
auto const timeout(seconds((hops * hops) * 10));
std::unique_lock<std::mutex> l {csMutex_};
// Check if already requested
if (csIDs_.empty())
{
{
std::lock_guard <decltype(mutex_)> lock {mutex_};
for (auto& id : ids_)
csIDs_.emplace(id.first);
}
// Relay request to active peers
protocol::TMGetShardInfo tmGS;
tmGS.set_hops(hops);
foreach(send_always(std::make_shared<Message>(
tmGS, protocol::mtGET_SHARD_INFO)));
if (csCV_.wait_for(l, timeout) == std::cv_status::timeout)
{
csIDs_.clear();
csCV_.notify_all();
}
csLast_ = duration_cast<seconds>(
clock_type::now().time_since_epoch());
}
else
csCV_.wait_for(l, timeout);
}
}
// Combine the shard info from peers and their sub peers
hash_map<PublicKey, PeerImp::ShardInfo> peerShardInfo;
for_each([&](std::shared_ptr<PeerImp> const& peer)
{
if (auto psi = peer->getPeerShardInfo())
{
for (auto const& e : *psi)
{
auto it {peerShardInfo.find(e.first)};
if (it != peerShardInfo.end())
// The key exists so join the shard indexes.
it->second.shardIndexes += e.second.shardIndexes;
else
peerShardInfo.emplace(std::move(e));
}
}
});
// Prepare json reply
auto& av = jv[jss::peers] = Json::Value(Json::arrayValue);
for (auto const& e : peerShardInfo)
{
auto& pv {av.append(Json::Value(Json::objectValue))};
if (pubKey)
pv[jss::public_key] = toBase58(TokenType::NodePublic, e.first);
auto const& address {e.second.endpoint.address()};
if (!address.is_unspecified())
pv[jss::ip] = address.to_string();
pv[jss::complete_shards] = to_string(e.second.shardIndexes);
}
return jv;
}
void
OverlayImpl::lastLink(std::uint32_t id)
{
// Notify threads when every peer has received a last link.
// This doesn't account for every node that might reply but
// it is adequate.
std::lock_guard<std::mutex> l {csMutex_};
if (csIDs_.erase(id) && csIDs_.empty())
csCV_.notify_all();
}
std::size_t
OverlayImpl::selectPeers (PeerSet& set, std::size_t limit,
std::function<bool(std::shared_ptr<Peer> const&)> score)
@@ -786,9 +882,12 @@ OverlayImpl::crawl()
sp->getRemoteAddress().port());
}
}
auto version = sp->getVersion ();
if (! version.empty ())
pv[jss::version] = version;
{
auto version {sp->getVersion()};
if (!version.empty())
pv[jss::version] = std::move(version);
}
std::uint32_t minSeq, maxSeq;
sp->ledgerRange(minSeq, maxSeq);
@@ -797,9 +896,8 @@ OverlayImpl::crawl()
std::to_string(minSeq) + "-" +
std::to_string(maxSeq);
auto shards = sp->getShards();
if (! shards.empty())
pv[jss::complete_shards] = shards;
if (auto shardIndexes = sp->getShardIndexes())
pv[jss::complete_shards] = to_string(*shardIndexes);
});
return jv;

View File

@@ -123,6 +123,13 @@ private:
std::atomic <uint64_t> peerDisconnects_ {0};
std::atomic <uint64_t> peerDisconnectsCharges_ {0};
// Last time we crawled peers for shard info. 'cs' = crawl shards
std::atomic<std::chrono::seconds> csLast_{std::chrono::seconds{0}};
std::mutex csMutex_;
std::condition_variable csCV_;
// Peer IDs expecting to receive a last link notification
std::set<std::uint32_t> csIDs_;
//--------------------------------------------------------------------------
public:
@@ -221,15 +228,17 @@ public:
void
for_each (UnaryFunc&& f)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
// Iterate over a copy of the peer list because peer
// destruction can invalidate iterators.
std::vector<std::weak_ptr<PeerImp>> wp;
wp.reserve(ids_.size());
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
for (auto& x : ids_)
wp.push_back(x.second);
// Iterate over a copy of the peer list because peer
// destruction can invalidate iterators.
wp.reserve(ids_.size());
for (auto& x : ids_)
wp.push_back(x.second);
}
for (auto& w : wp)
{
@@ -340,6 +349,17 @@ public:
return peerDisconnectsCharges_;
}
Json::Value
crawlShards(bool pubKey, std::uint32_t hops) override;
/** Called when the last link from a peer chain is received.
@param id peer id that received the shard info.
*/
void
lastLink(std::uint32_t id);
private:
std::shared_ptr<Writer>
makeRedirectResponse (PeerFinder::Slot::ptr const& slot,

View File

@@ -35,6 +35,7 @@
#include <ripple/beast/core/SemanticVersion.h>
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/predicates.h>
#include <ripple/protocol/digest.h>
#include <boost/algorithm/string/predicate.hpp>
@@ -144,6 +145,11 @@ PeerImp::run()
doProtocolStart();
}
// Request shard info from peer
protocol::TMGetShardInfo tmGS;
tmGS.set_hops(0);
send(std::make_shared<Message>(tmGS, protocol::mtGET_SHARD_INFO));
setTimer();
}
@@ -360,16 +366,20 @@ PeerImp::json()
bool
PeerImp::hasLedger (uint256 const& hash, std::uint32_t seq) const
{
std::lock_guard<std::mutex> sl(recentLock_);
if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) &&
(sanity_.load() == Sanity::sane))
return true;
if (std::find(recentLedgers_.begin(),
recentLedgers_.end(), hash) != recentLedgers_.end())
return true;
return seq >= app_.getNodeStore().earliestSeq() &&
boost::icl::contains(shards_,
{
std::lock_guard<std::mutex> sl(recentLock_);
if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) &&
(sanity_.load() == Sanity::sane))
return true;
if (std::find(recentLedgers_.begin(),
recentLedgers_.end(), hash) != recentLedgers_.end())
return true;
}
if (seq >= app_.getNodeStore().earliestSeq())
return hasShard(
(seq - 1) / NodeStore::DatabaseShard::ledgersPerShardDefault);
return false;
}
void
@@ -385,19 +395,11 @@ PeerImp::ledgerRange (std::uint32_t& minSeq,
bool
PeerImp::hasShard (std::uint32_t shardIndex) const
{
std::lock_guard<std::mutex> sl(recentLock_);
return boost::icl::contains(shards_, shardIndex);
}
std::string
PeerImp::getShards () const
{
{
std::lock_guard<std::mutex> sl(recentLock_);
if (!shards_.empty())
return to_string(shards_);
}
return {};
std::lock_guard<std::mutex> l {shardInfoMutex_};
auto const it {shardInfo_.find(publicKey_)};
if (it != shardInfo_.end())
return boost::icl::contains(it->second.shardIndexes, shardIndex);
return false;
}
bool
@@ -478,6 +480,25 @@ PeerImp::fail(std::string const& name, error_code ec)
close();
}
boost::optional<RangeSet<std::uint32_t>>
PeerImp::getShardIndexes() const
{
std::lock_guard<std::mutex> l {shardInfoMutex_};
auto it{shardInfo_.find(publicKey_)};
if (it != shardInfo_.end())
return it->second.shardIndexes;
return boost::none;
}
boost::optional<hash_map<PublicKey, PeerImp::ShardInfo>>
PeerImp::getPeerShardInfo() const
{
std::lock_guard<std::mutex> l {shardInfoMutex_};
if (!shardInfo_.empty())
return shardInfo_;
return boost::none;
}
void
PeerImp::gracefulClose()
{
@@ -995,6 +1016,202 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMCluster> const& m)
app_.getFeeTrack().setClusterFee(clusterFee);
}
void
PeerImp::onMessage (std::shared_ptr <protocol::TMGetShardInfo> const& m)
{
if (m->hops() > csHopLimit || m->peerchain_size() > csHopLimit)
{
fee_ = Resource::feeInvalidRequest;
JLOG(p_journal_.warn()) <<
(m->hops() > csHopLimit ?
"Hops (" + std::to_string(m->hops()) + ") exceed limit" :
"Invalid Peerchain");
return;
}
// Reply with shard info we may have
if (auto shardStore = app_.getShardStore())
{
fee_ = Resource::feeLightPeer;
auto shards {shardStore->getCompleteShards()};
if (!shards.empty())
{
protocol::TMShardInfo reply;
reply.set_shardindexes(shards);
if (m->has_lastlink())
reply.set_lastlink(true);
if (m->peerchain_size() > 0)
*reply.mutable_peerchain() = m->peerchain();
send(std::make_shared<Message>(reply, protocol::mtSHARD_INFO));
JLOG(p_journal_.trace()) <<
"Sent shard indexes " << shards;
}
}
// Relay request to peers
if (m->hops() > 0)
{
fee_ = Resource::feeMediumBurdenPeer;
m->set_hops(m->hops() - 1);
if (m->hops() == 0)
m->set_lastlink(true);
m->add_peerchain(id());
overlay_.foreach(send_if_not(
std::make_shared<Message>(*m, protocol::mtGET_SHARD_INFO),
match_peer(this)));
}
}
void
PeerImp::onMessage(std::shared_ptr <protocol::TMShardInfo> const& m)
{
if (m->shardindexes().empty() || m->peerchain_size() > csHopLimit)
{
fee_ = Resource::feeBadData;
JLOG(p_journal_.warn()) <<
(m->shardindexes().empty() ?
"Missing shard indexes" :
"Invalid Peerchain");
return;
}
// Check if the message should be forwarded to another peer
if (m->peerchain_size() > 0)
{
auto const peerId {m->peerchain(m->peerchain_size() - 1)};
if (auto peer = overlay_.findPeerByShortID(peerId))
{
if (!m->has_nodepubkey())
m->set_nodepubkey(publicKey_.data(), publicKey_.size());
if (!m->has_endpoint())
{
// Check if peer will share IP publicly
if (crawl())
m->set_endpoint(remote_address_.address().to_string());
else
m->set_endpoint("0");
}
m->mutable_peerchain()->RemoveLast();
peer->send(std::make_shared<Message>(*m, protocol::mtSHARD_INFO));
JLOG(p_journal_.trace()) <<
"Relayed TMShardInfo to peer with IP " <<
remote_address_.address().to_string();
}
else
{
// Peer is no longer available so the relay ends
fee_ = Resource::feeUnwantedData;
JLOG(p_journal_.info()) <<
"Unable to route shard info";
}
return;
}
// Parse the shard indexes received in the shard info
RangeSet<std::uint32_t> shardIndexes;
{
std::vector<std::string> tokens;
boost::split(tokens, m->shardindexes(), boost::algorithm::is_any_of(","));
for (auto const& t : tokens)
{
std::vector<std::string> seqs;
boost::split(seqs, t, boost::algorithm::is_any_of("-"));
if (seqs.empty() || seqs.size() > 2)
{
fee_ = Resource::feeBadData;
return;
}
std::uint32_t first;
if (!beast::lexicalCastChecked(first, seqs.front()))
{
fee_ = Resource::feeBadData;
return;
}
if (seqs.size() == 1)
shardIndexes.insert(first);
else
{
std::uint32_t second;
if (!beast::lexicalCastChecked(second, seqs.back()))
{
fee_ = Resource::feeBadData;
return;
}
shardIndexes.insert(range(first, second));
}
}
}
// Get the Public key of the node reporting the shard info
PublicKey publicKey;
if (m->has_nodepubkey())
publicKey = PublicKey(makeSlice(m->nodepubkey()));
else
publicKey = publicKey_;
// Get the IP of the node reporting the shard info
beast::IP::Endpoint endpoint;
if (m->has_endpoint())
{
if (m->endpoint() != "0")
{
auto result {
beast::IP::Endpoint::from_string_checked(m->endpoint())};
if (!result.second)
{
fee_ = Resource::feeBadData;
JLOG(p_journal_.warn()) <<
"failed to parse incoming endpoint: {" <<
m->endpoint() << "}";
return;
}
endpoint = std::move(result.first);
}
}
else if (crawl()) // Check if peer will share IP publicly
endpoint = remote_address_;
{
std::lock_guard<std::mutex> l {shardInfoMutex_};
auto it {shardInfo_.find(publicKey)};
if (it != shardInfo_.end())
{
// Update the IP address for the node
it->second.endpoint = std::move(endpoint);
// Join the shard index range set
it->second.shardIndexes += shardIndexes;
}
else
{
// Add a new node
ShardInfo shardInfo;
shardInfo.endpoint = std::move(endpoint);
shardInfo.shardIndexes = std::move(shardIndexes);
shardInfo_.emplace(publicKey, std::move(shardInfo));
}
}
JLOG(p_journal_.trace()) <<
"Consumed TMShardInfo originating from public key " <<
toBase58(TokenType::NodePublic, publicKey) <<
" shard indexes " << m->shardindexes();
if (m->has_lastlink())
overlay_.lastLink(id_);
}
void
PeerImp::onMessage (std::shared_ptr <protocol::TMGetPeers> const& m)
{
@@ -1414,26 +1631,6 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMStatusChange> const& m)
minLedger_ = 0;
}
if (m->has_shardseqs())
{
std::vector<std::string> tokens;
boost::split(tokens, m->shardseqs(), boost::algorithm::is_any_of(","));
std::lock_guard<std::mutex> sl(recentLock_);
shards_.clear();
for (auto const& t : tokens)
{
std::vector<std::string> seqs;
boost::split(seqs, t, boost::algorithm::is_any_of("-"));
if (seqs.size() == 1)
shards_.insert(
beast::lexicalCastThrow<std::uint32_t>(seqs.front()));
else if (seqs.size() == 2)
shards_.insert(range(
beast::lexicalCastThrow<std::uint32_t>(seqs.front()),
beast::lexicalCastThrow<std::uint32_t>(seqs.back())));
}
}
if (m->has_ledgerseq() &&
app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
{
@@ -1509,9 +1706,6 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMStatusChange> const& m)
Json::UInt (m->lastseq ());
}
if (m->has_shardseqs())
j[jss::complete_shards] = m->shardseqs();
return j;
});
}

View File

@@ -22,16 +22,18 @@
#include <ripple/app/consensus/RCLCxPeerPos.h>
#include <ripple/basics/Log.h>
#include <ripple/beast/utility/WrappedSink.h>
#include <ripple/basics/RangeSet.h>
#include <ripple/beast/utility/WrappedSink.h>
#include <ripple/overlay/impl/ProtocolMessage.h>
#include <ripple/overlay/impl/OverlayImpl.h>
#include <ripple/peerfinder/PeerfinderManager.h>
#include <ripple/protocol/Protocol.h>
#include <ripple/protocol/STTx.h>
#include <ripple/protocol/STValidation.h>
#include <ripple/resource/Fees.h>
#include <boost/endian/conversion.hpp>
#include <boost/optional.hpp>
#include <cstdint>
#include <deque>
#include <queue>
@@ -77,6 +79,12 @@ public:
,sane
};
struct ShardInfo
{
beast::IP::Endpoint endpoint;
RangeSet<std::uint32_t> shardIndexes;
};
using ptr = std::shared_ptr <PeerImp>;
private:
@@ -107,7 +115,7 @@ private:
// Updated at each stage of the connection process to reflect
// the current conditions as closely as possible.
beast::IP::Endpoint remote_address_;
beast::IP::Endpoint const remote_address_;
// These are up here to prevent warnings about order of initializations
//
@@ -126,7 +134,6 @@ private:
//
LedgerIndex minLedger_ = 0;
LedgerIndex maxLedger_ = 0;
RangeSet<std::uint32_t> shards_;
uint256 closedLedgerHash_;
uint256 previousLedgerHash_;
std::deque<uint256> recentLedgers_;
@@ -155,6 +162,9 @@ private:
std::unique_ptr <LoadEvent> load_event_;
bool hopsAware_ = false;
std::mutex mutable shardInfoMutex_;
hash_map<PublicKey, ShardInfo> shardInfo_;
friend class OverlayImpl;
public:
@@ -235,6 +245,7 @@ public:
return id_;
}
/** Returns `true` if this connection will publicly share its IP address. */
bool
crawl() const;
@@ -301,9 +312,6 @@ public:
bool
hasShard (std::uint32_t shardIndex) const override;
std::string
getShards () const override;
bool
hasTxSet (uint256 const& hash) const override;
@@ -326,6 +334,14 @@ public:
void
fail(std::string const& reason);
/** Return a range set of known shard indexes from this peer. */
boost::optional<RangeSet<std::uint32_t>>
getShardIndexes() const;
/** Return any known shard info from this peer and its sub peers. */
boost::optional<hash_map<PublicKey, ShardInfo>>
getPeerShardInfo() const;
private:
void
close();
@@ -412,6 +428,8 @@ public:
void onMessage (std::shared_ptr <protocol::TMManifests> const& m);
void onMessage (std::shared_ptr <protocol::TMPing> const& m);
void onMessage (std::shared_ptr <protocol::TMCluster> const& m);
void onMessage (std::shared_ptr <protocol::TMGetShardInfo> const& m);
void onMessage (std::shared_ptr <protocol::TMShardInfo> const& m);
void onMessage (std::shared_ptr <protocol::TMGetPeers> const& m);
void onMessage (std::shared_ptr <protocol::TMPeers> const& m);
void onMessage (std::shared_ptr <protocol::TMEndpoints> const& m);

View File

@@ -46,6 +46,8 @@ protocolMessageName (int type)
case protocol::mtPING: return "ping";
case protocol::mtPROOFOFWORK: return "proof_of_work";
case protocol::mtCLUSTER: return "cluster";
case protocol::mtGET_SHARD_INFO: return "get_shard_info";
case protocol::mtSHARD_INFO: return "shard_info";
case protocol::mtGET_PEERS: return "get_peers";
case protocol::mtPEERS: return "peers";
case protocol::mtENDPOINTS: return "endpoints";
@@ -117,6 +119,8 @@ invokeProtocolMessage (Buffers const& buffers, Handler& handler)
case protocol::mtMANIFESTS: ec = detail::invoke<protocol::TMManifests> (type, buffers, handler); break;
case protocol::mtPING: ec = detail::invoke<protocol::TMPing> (type, buffers, handler); break;
case protocol::mtCLUSTER: ec = detail::invoke<protocol::TMCluster> (type, buffers, handler); break;
case protocol::mtGET_SHARD_INFO:ec = detail::invoke<protocol::TMGetShardInfo> (type, buffers, handler); break;
case protocol::mtSHARD_INFO: ec = detail::invoke<protocol::TMShardInfo>(type, buffers, handler); break;
case protocol::mtGET_PEERS: ec = detail::invoke<protocol::TMGetPeers> (type, buffers, handler); break;
case protocol::mtPEERS: ec = detail::invoke<protocol::TMPeers> (type, buffers, handler); break;
case protocol::mtENDPOINTS: ec = detail::invoke<protocol::TMEndpoints> (type, buffers, handler); break;

View File

@@ -64,6 +64,8 @@ TrafficCount::category TrafficCount::categorize (
if ((type == protocol::mtMANIFESTS) ||
(type == protocol::mtENDPOINTS) ||
(type == protocol::mtGET_SHARD_INFO) ||
(type == protocol::mtSHARD_INFO) ||
(type == protocol::mtPEERS) ||
(type == protocol::mtGET_PEERS))
return TrafficCount::category::CT_overlay;