Add RPC command shard crawl (RIPD-1663)

This commit is contained in:
Miguel Portilla
2018-07-10 11:11:29 -04:00
parent 3e22a1e9e8
commit 17e0d098a0
16 changed files with 473 additions and 74 deletions

View File

@@ -628,12 +628,6 @@ RCLConsensus::Adaptor::notify(
}
s.set_firstseq(uMin);
s.set_lastseq(uMax);
if (auto shardStore = app_.getShardStore())
{
auto shards = shardStore->getCompleteShards();
if (! shards.empty())
s.set_shardseqs(shards);
}
app_.overlay ().foreach (send_always (
std::make_shared <Message> (
s, protocol::mtSTATUS_CHANGE)));

View File

@@ -1146,6 +1146,7 @@ public:
{ "submit_multisigned", &RPCParser::parseSubmitMultiSigned, 1, 1 },
{ "server_info", &RPCParser::parseServerInfo, 0, 1 },
{ "server_state", &RPCParser::parseServerInfo, 0, 1 },
{ "shards", &RPCParser::parseAsIs, 0, 0 },
{ "stop", &RPCParser::parseAsIs, 0, 0 },
{ "transaction_entry", &RPCParser::parseTransactionEntry, 2, 2 },
{ "tx", &RPCParser::parseTx, 1, 2 },

View File

@@ -25,6 +25,8 @@
#include <ripple/basics/random.h>
#include <ripple/nodestore/DummyScheduler.h>
#include <ripple/nodestore/Manager.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/overlay/predicates.h>
#include <ripple/protocol/HashPrefix.h>
namespace ripple {
@@ -513,6 +515,14 @@ DatabaseShardImp::setStored(std::shared_ptr<Ledger const> const& ledger)
complete_.emplace(incomplete_->index(), std::move(incomplete_));
incomplete_.reset();
updateStats(l);
// Update peers with new shard index
protocol::TMShardInfo message;
auto const& publicKey {app_.nodeIdentity().first};
message.set_nodepubkey(publicKey.data(), publicKey.size());
message.set_shardindexes(std::to_string(shardIndex));
app_.overlay().foreach(send_always(
std::make_shared<Message>(message, protocol::mtSHARD_INFO)));
}
}

View File

@@ -241,6 +241,15 @@ public:
virtual std::uint64_t getPeerDisconnect() const = 0;
virtual void incPeerDisconnectCharges() = 0;
virtual std::uint64_t getPeerDisconnectCharges() const = 0;
/** Returns information reported to the crawl shard RPC command.
@param hops the maximum jumps the crawler will attempt.
The number of hops achieved is not guaranteed.
*/
virtual
Json::Value
crawlShards(bool pubKey, std::uint32_t hops) = 0;
};
struct ScoreHasLedger

View File

@@ -100,7 +100,6 @@ public:
virtual bool hasLedger (uint256 const& hash, std::uint32_t seq) const = 0;
virtual void ledgerRange (std::uint32_t& minSeq, std::uint32_t& maxSeq) const = 0;
virtual bool hasShard (std::uint32_t shardIndex) const = 0;
virtual std::string getShards() const = 0;
virtual bool hasTxSet (uint256 const& hash) const = 0;
virtual void cycleStatus () = 0;
virtual bool supportsVersion (int version) = 0;

View File

@@ -708,6 +708,98 @@ OverlayImpl::reportTraffic (
m_traffic.addCount (cat, isInbound, number);
}
Json::Value
OverlayImpl::crawlShards(bool pubKey, std::uint32_t hops)
{
using namespace std::chrono;
using namespace std::chrono_literals;
Json::Value jv(Json::objectValue);
auto const numPeers {size()};
if (numPeers == 0)
return jv;
// If greater than a hop away, we may need to gather or freshen data
if (hops > 0)
{
// Prevent crawl spamming
clock_type::time_point const last(csLast_.load());
if (duration_cast<seconds>(clock_type::now() - last) > 60s)
{
auto const timeout(seconds((hops * hops) * 10));
std::unique_lock<std::mutex> l {csMutex_};
// Check if already requested
if (csIDs_.empty())
{
{
std::lock_guard <decltype(mutex_)> lock {mutex_};
for (auto& id : ids_)
csIDs_.emplace(id.first);
}
// Relay request to active peers
protocol::TMGetShardInfo tmGS;
tmGS.set_hops(hops);
foreach(send_always(std::make_shared<Message>(
tmGS, protocol::mtGET_SHARD_INFO)));
if (csCV_.wait_for(l, timeout) == std::cv_status::timeout)
{
csIDs_.clear();
csCV_.notify_all();
}
csLast_ = duration_cast<seconds>(
clock_type::now().time_since_epoch());
}
else
csCV_.wait_for(l, timeout);
}
}
// Combine the shard info from peers and their sub peers
hash_map<PublicKey, PeerImp::ShardInfo> peerShardInfo;
for_each([&](std::shared_ptr<PeerImp>&& peer)
{
if (auto psi = peer->getPeerShardInfo())
{
for (auto const& e : *psi)
{
auto it {peerShardInfo.find(e.first)};
if (it != peerShardInfo.end())
// The key exists so join the shard indexes.
it->second.shardIndexes += e.second.shardIndexes;
else
peerShardInfo.emplace(std::move(e));
}
}
});
// Prepare json reply
auto& av = jv[jss::peers] = Json::Value(Json::arrayValue);
for (auto const& e : peerShardInfo)
{
auto& pv {av.append(Json::Value(Json::objectValue))};
if (pubKey)
pv[jss::public_key] = toBase58(TokenType::NodePublic, e.first);
pv[jss::ip] = e.second.endpoint.address().to_string();
pv[jss::complete_shards] = to_string(e.second.shardIndexes);
}
return jv;
}
void
OverlayImpl::lastLink(std::uint32_t id)
{
// Notify threads when every peer has received a last link.
// This doesn't account for every node that might reply but
// it is adequate.
std::lock_guard<std::mutex> l {csMutex_};
if (csIDs_.erase(id) && csIDs_.empty())
csCV_.notify_all();
}
std::size_t
OverlayImpl::selectPeers (PeerSet& set, std::size_t limit,
std::function<bool(std::shared_ptr<Peer> const&)> score)
@@ -787,9 +879,12 @@ OverlayImpl::crawl()
sp->getRemoteAddress().port());
}
}
auto version = sp->getVersion ();
if (! version.empty ())
pv[jss::version] = version;
{
auto version {sp->getVersion()};
if (!version.empty())
pv[jss::version] = std::move(version);
}
std::uint32_t minSeq, maxSeq;
sp->ledgerRange(minSeq, maxSeq);
@@ -798,9 +893,8 @@ OverlayImpl::crawl()
std::to_string(minSeq) + "-" +
std::to_string(maxSeq);
auto shards = sp->getShards();
if (! shards.empty())
pv[jss::complete_shards] = shards;
if (auto shardIndexes = sp->getShardIndexes())
pv[jss::complete_shards] = to_string(shardIndexes);
});
return jv;

View File

@@ -123,6 +123,13 @@ private:
std::atomic <uint64_t> peerDisconnects_ {0};
std::atomic <uint64_t> peerDisconnectsCharges_ {0};
// Last time we crawled peers for shard info
std::atomic<std::chrono::seconds> csLast_{std::chrono::seconds{0}};
std::mutex csMutex_;
std::condition_variable csCV_;
// Peer IDs expecting to receive a last link notification
std::set<std::uint32_t> csIDs_;
//--------------------------------------------------------------------------
public:
@@ -221,15 +228,17 @@ public:
void
for_each (UnaryFunc&& f)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
// Iterate over a copy of the peer list because peer
// destruction can invalidate iterators.
std::vector<std::weak_ptr<PeerImp>> wp;
wp.reserve(ids_.size());
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
for (auto& x : ids_)
wp.push_back(x.second);
// Iterate over a copy of the peer list because peer
// destruction can invalidate iterators.
wp.reserve(ids_.size());
for (auto& x : ids_)
wp.push_back(x.second);
}
for (auto& w : wp)
{
@@ -340,6 +349,17 @@ public:
return peerDisconnectsCharges_;
}
Json::Value
crawlShards(bool pubKey, std::uint32_t hops) override;
/** Called when the last link from a peer chain is received.
@param id peer id that received the shard info.
*/
void
lastLink(std::uint32_t id);
private:
std::shared_ptr<Writer>
makeRedirectResponse (PeerFinder::Slot::ptr const& slot,

View File

@@ -35,6 +35,7 @@
#include <ripple/beast/core/SemanticVersion.h>
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/predicates.h>
#include <ripple/protocol/digest.h>
#include <boost/algorithm/string/predicate.hpp>
@@ -144,6 +145,11 @@ PeerImp::run()
doProtocolStart();
}
// Request shard info from peer
protocol::TMGetShardInfo tmGS;
tmGS.set_hops(0);
send(std::make_shared<Message>(tmGS, protocol::mtGET_SHARD_INFO));
setTimer();
}
@@ -360,16 +366,20 @@ PeerImp::json()
bool
PeerImp::hasLedger (uint256 const& hash, std::uint32_t seq) const
{
std::lock_guard<std::mutex> sl(recentLock_);
if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) &&
(sanity_.load() == Sanity::sane))
return true;
if (std::find(recentLedgers_.begin(),
recentLedgers_.end(), hash) != recentLedgers_.end())
return true;
return seq >= app_.getNodeStore().earliestSeq() &&
boost::icl::contains(shards_,
{
std::lock_guard<std::mutex> sl(recentLock_);
if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) &&
(sanity_.load() == Sanity::sane))
return true;
if (std::find(recentLedgers_.begin(),
recentLedgers_.end(), hash) != recentLedgers_.end())
return true;
}
if (seq >= app_.getNodeStore().earliestSeq())
return hasShard(
(seq - 1) / NodeStore::DatabaseShard::ledgersPerShardDefault);
return false;
}
void
@@ -385,19 +395,11 @@ PeerImp::ledgerRange (std::uint32_t& minSeq,
bool
PeerImp::hasShard (std::uint32_t shardIndex) const
{
std::lock_guard<std::mutex> sl(recentLock_);
return boost::icl::contains(shards_, shardIndex);
}
std::string
PeerImp::getShards () const
{
{
std::lock_guard<std::mutex> sl(recentLock_);
if (!shards_.empty())
return to_string(shards_);
}
return {};
std::lock_guard<std::mutex> l {shardInfoMutex_};
auto it {shardInfo_.find(publicKey_)};
if (it != shardInfo_.end())
return boost::icl::contains(it->second.shardIndexes, shardIndex);
return false;
}
bool
@@ -478,6 +480,25 @@ PeerImp::fail(std::string const& name, error_code ec)
close();
}
boost::optional<RangeSet<std::uint32_t>>
PeerImp::getShardIndexes() const
{
std::lock_guard<std::mutex> l {shardInfoMutex_};
auto it{shardInfo_.find(publicKey_)};
if (it != shardInfo_.end())
return it->second.shardIndexes;
return boost::none;
}
boost::optional<hash_map<PublicKey, PeerImp::ShardInfo>>
PeerImp::getPeerShardInfo() const
{
std::lock_guard<std::mutex> l {shardInfoMutex_};
if (!shardInfo_.empty())
return shardInfo_;
return boost::none;
}
void
PeerImp::gracefulClose()
{
@@ -995,6 +1016,154 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMCluster> const& m)
app_.getFeeTrack().setClusterFee(clusterFee);
}
void
PeerImp::onMessage (std::shared_ptr <protocol::TMGetShardInfo> const& m)
{
fee_ = Resource::feeMediumBurdenPeer;
// Reply with shard info we may have
if (auto shardStore = app_.getShardStore())
{
auto shards {shardStore->getCompleteShards()};
if (!shards.empty())
{
protocol::TMShardInfo reply;
auto const& publicKey {app_.nodeIdentity().first};
reply.set_nodepubkey(publicKey.data(), publicKey.size());
reply.set_shardindexes(shards);
if (m->has_lastlink())
reply.set_lastlink(true);
if (m->peerchain_size() > 0)
*reply.mutable_peerchain() = m->peerchain();
send(std::make_shared<Message>(reply, protocol::mtSHARD_INFO));
JLOG(p_journal_.trace()) <<
"Sent shard info to peer with IP " <<
remote_address_.address().to_string() <<
" public key " << toBase58(TokenType::NodePublic, publicKey) <<
" shard indexes " << shards;
}
}
// Relay request to peers
if (m->hops() > 0)
{
m->set_hops(m->hops() - 1);
if (m->hops() == 0)
m->set_lastlink(true);
m->add_peerchain(id());
overlay_.foreach(send_if_not(
std::make_shared<Message>(*m, protocol::mtGET_SHARD_INFO),
match_peer(this)));
}
}
void
PeerImp::onMessage(std::shared_ptr <protocol::TMShardInfo> const& m)
{
// Check if the message should be forwarded to another peer
if (m->peerchain_size() > 0)
{
auto const peerId {m->peerchain(m->peerchain_size() - 1)};
if (auto peer = overlay_.findPeerByShortID(peerId))
{
if (!m->has_endpoint())
m->set_endpoint(remote_address_.address().to_string());
m->mutable_peerchain()->RemoveLast();
peer->send(std::make_shared<Message>(*m, protocol::mtSHARD_INFO));
JLOG(p_journal_.trace()) <<
"Relayed TMShardInfo to peer with IP " <<
remote_address_.address().to_string();
}
else
{
// Peer is no longer available so the relay ends
JLOG(p_journal_.info()) <<
"Unable to route shard info";
fee_ = Resource::feeUnwantedData;
}
return;
}
// Consume the shard info received
if (m->shardindexes().empty())
{
JLOG(p_journal_.error()) <<
"Node response missing shard indexes";
return;
}
// Get the IP of the node reporting the shard info
beast::IP::Endpoint address;
if (m->has_endpoint())
{
auto result {beast::IP::Endpoint::from_string_checked(m->endpoint())};
if (!result.second)
{
JLOG(p_journal_.error()) <<
"failed to parse incoming endpoint: {" <<
m->endpoint() << "}";
return;
}
address = std::move(result.first);
}
else
address = remote_address_;
RangeSet<std::uint32_t>* shardIndexes {nullptr};
PublicKey const publicKey(makeSlice(m->nodepubkey()));
std::lock_guard<std::mutex> l {shardInfoMutex_};
auto it {shardInfo_.find(publicKey)};
if (it != shardInfo_.end())
{
// Update the IP address for the node
it->second.endpoint = address;
// Update the shard indexes held by the node
shardIndexes = &(it->second.shardIndexes);
}
else
{
// Add a new node
ShardInfo shardInfo;
shardInfo.endpoint = address;
shardIndexes = &(shardInfo_.emplace(std::move(publicKey),
std::move(shardInfo)).first->second.shardIndexes);
}
// Parse shard indexes
std::vector<std::string> tokens;
boost::split(tokens, m->shardindexes(), boost::algorithm::is_any_of(","));
for (auto const& t : tokens)
{
std::vector<std::string> seqs;
boost::split(seqs, t, boost::algorithm::is_any_of("-"));
if (seqs.size() == 1)
shardIndexes->insert(
beast::lexicalCastThrow<std::uint32_t>(seqs.front()));
else if (seqs.size() == 2)
shardIndexes->insert(range(
beast::lexicalCastThrow<std::uint32_t>(seqs.front()),
beast::lexicalCastThrow<std::uint32_t>(seqs.back())));
}
JLOG(p_journal_.trace()) <<
"Consumed TMShardInfo originating from peer with IP " <<
address.address().to_string() <<
" public key " << toBase58(TokenType::NodePublic, publicKey) <<
" shard indexes " << to_string(*shardIndexes);
if (m->has_lastlink())
overlay_.lastLink(id_);
}
void
PeerImp::onMessage (std::shared_ptr <protocol::TMGetPeers> const& m)
{
@@ -1414,26 +1583,6 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMStatusChange> const& m)
minLedger_ = 0;
}
if (m->has_shardseqs())
{
std::vector<std::string> tokens;
boost::split(tokens, m->shardseqs(), boost::algorithm::is_any_of(","));
std::lock_guard<std::mutex> sl(recentLock_);
shards_.clear();
for (auto const& t : tokens)
{
std::vector<std::string> seqs;
boost::split(seqs, t, boost::algorithm::is_any_of("-"));
if (seqs.size() == 1)
shards_.insert(
beast::lexicalCastThrow<std::uint32_t>(seqs.front()));
else if (seqs.size() == 2)
shards_.insert(range(
beast::lexicalCastThrow<std::uint32_t>(seqs.front()),
beast::lexicalCastThrow<std::uint32_t>(seqs.back())));
}
}
if (m->has_ledgerseq() &&
app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
{
@@ -1509,9 +1658,6 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMStatusChange> const& m)
Json::UInt (m->lastseq ());
}
if (m->has_shardseqs())
j[jss::complete_shards] = m->shardseqs();
return j;
});
}

View File

@@ -22,20 +22,23 @@
#include <ripple/app/consensus/RCLCxPeerPos.h>
#include <ripple/basics/Log.h>
#include <ripple/beast/utility/WrappedSink.h>
#include <ripple/basics/RangeSet.h>
#include <ripple/beast/utility/WrappedSink.h>
#include <ripple/overlay/impl/ProtocolMessage.h>
#include <ripple/overlay/impl/OverlayImpl.h>
#include <ripple/peerfinder/PeerfinderManager.h>
#include <ripple/protocol/Protocol.h>
#include <ripple/protocol/STTx.h>
#include <ripple/protocol/STValidation.h>
#include <ripple/resource/Fees.h>
#include <boost/endian/conversion.hpp>
#include <boost/optional.hpp>
#include <cstdint>
#include <deque>
#include <queue>
namespace ripple {
class PeerImp
@@ -77,6 +80,12 @@ public:
,sane
};
struct ShardInfo
{
beast::IP::Endpoint endpoint;
RangeSet<std::uint32_t> shardIndexes;
};
using ptr = std::shared_ptr <PeerImp>;
private:
@@ -107,7 +116,7 @@ private:
// Updated at each stage of the connection process to reflect
// the current conditions as closely as possible.
beast::IP::Endpoint remote_address_;
beast::IP::Endpoint const remote_address_;
// These are up here to prevent warnings about order of initializations
//
@@ -126,7 +135,6 @@ private:
//
LedgerIndex minLedger_ = 0;
LedgerIndex maxLedger_ = 0;
RangeSet<std::uint32_t> shards_;
uint256 closedLedgerHash_;
uint256 previousLedgerHash_;
std::deque<uint256> recentLedgers_;
@@ -155,6 +163,9 @@ private:
std::unique_ptr <LoadEvent> load_event_;
bool hopsAware_ = false;
std::mutex mutable shardInfoMutex_;
hash_map<PublicKey, ShardInfo> shardInfo_;
friend class OverlayImpl;
public:
@@ -235,6 +246,7 @@ public:
return id_;
}
/** Returns `true` if this connection will publicly share its IP address. */
bool
crawl() const;
@@ -301,9 +313,6 @@ public:
bool
hasShard (std::uint32_t shardIndex) const override;
std::string
getShards () const override;
bool
hasTxSet (uint256 const& hash) const override;
@@ -326,6 +335,14 @@ public:
void
fail(std::string const& reason);
/** Return a range set of known shard indexes from this peer. */
boost::optional<RangeSet<std::uint32_t>>
getShardIndexes() const;
/** Return any known shard info from this peer and its sub peers. */
boost::optional<hash_map<PublicKey, ShardInfo>>
getPeerShardInfo() const;
private:
void
close();
@@ -412,6 +429,8 @@ public:
void onMessage (std::shared_ptr <protocol::TMManifests> const& m);
void onMessage (std::shared_ptr <protocol::TMPing> const& m);
void onMessage (std::shared_ptr <protocol::TMCluster> const& m);
void onMessage (std::shared_ptr <protocol::TMGetShardInfo> const& m);
void onMessage (std::shared_ptr <protocol::TMShardInfo> const& m);
void onMessage (std::shared_ptr <protocol::TMGetPeers> const& m);
void onMessage (std::shared_ptr <protocol::TMPeers> const& m);
void onMessage (std::shared_ptr <protocol::TMEndpoints> const& m);

View File

@@ -46,6 +46,8 @@ protocolMessageName (int type)
case protocol::mtPING: return "ping";
case protocol::mtPROOFOFWORK: return "proof_of_work";
case protocol::mtCLUSTER: return "cluster";
case protocol::mtGET_SHARD_INFO: return "get_shard_info";
case protocol::mtSHARD_INFO: return "shard_info";
case protocol::mtGET_PEERS: return "get_peers";
case protocol::mtPEERS: return "peers";
case protocol::mtENDPOINTS: return "endpoints";
@@ -117,6 +119,8 @@ invokeProtocolMessage (Buffers const& buffers, Handler& handler)
case protocol::mtMANIFESTS: ec = detail::invoke<protocol::TMManifests> (type, buffers, handler); break;
case protocol::mtPING: ec = detail::invoke<protocol::TMPing> (type, buffers, handler); break;
case protocol::mtCLUSTER: ec = detail::invoke<protocol::TMCluster> (type, buffers, handler); break;
case protocol::mtGET_SHARD_INFO:ec = detail::invoke<protocol::TMGetShardInfo> (type, buffers, handler); break;
case protocol::mtSHARD_INFO: ec = detail::invoke<protocol::TMShardInfo>(type, buffers, handler); break;
case protocol::mtGET_PEERS: ec = detail::invoke<protocol::TMGetPeers> (type, buffers, handler); break;
case protocol::mtPEERS: ec = detail::invoke<protocol::TMPeers> (type, buffers, handler); break;
case protocol::mtENDPOINTS: ec = detail::invoke<protocol::TMEndpoints> (type, buffers, handler); break;

View File

@@ -64,6 +64,8 @@ TrafficCount::category TrafficCount::categorize (
if ((type == protocol::mtMANIFESTS) ||
(type == protocol::mtENDPOINTS) ||
(type == protocol::mtGET_SHARD_INFO) ||
(type == protocol::mtSHARD_INFO) ||
(type == protocol::mtPEERS) ||
(type == protocol::mtGET_PEERS))
return TrafficCount::category::CT_overlay;

View File

@@ -8,6 +8,8 @@ enum MessageType
mtPING = 3;
mtPROOFOFWORK = 4;
mtCLUSTER = 5;
mtGET_SHARD_INFO = 10;
mtSHARD_INFO = 11;
mtGET_PEERS = 12;
mtPEERS = 13;
mtENDPOINTS = 15;
@@ -20,8 +22,6 @@ enum MessageType
mtVALIDATION = 41;
mtGET_OBJECTS = 42;
// <available> = 10;
// <available> = 11;
// <available> = 14;
// <available> = 20;
// <available> = 21;
@@ -127,6 +127,23 @@ message TMCluster
repeated TMLoadSource loadSources = 2;
}
// Request info on shards held
message TMGetShardInfo
{
required uint32 hops = 1; // number of hops to travel
optional bool lastLink = 2; // true if last link in the peer chain
repeated uint32 peerchain = 3;
}
// Info about shards held
message TMShardInfo
{
required bytes nodePubKey = 1;
required string shardIndexes = 2; // rangeSet of shard indexes
optional string endpoint = 3; // ipv6 or ipv4 address
optional bool lastLink = 4; // true if last link in the peer chain
repeated uint32 peerchain = 5;
}
// A transaction can have only one input and one output.
// If you want to send an amount that is greater than any single address of yours
@@ -180,7 +197,6 @@ message TMStatusChange
optional uint64 networkTime = 6;
optional uint32 firstSeq = 7;
optional uint32 lastSeq = 8;
optional string shardSeqs = 9;
}

View File

@@ -0,0 +1,82 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2018 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/main/Application.h>
#include <ripple/net/RPCErr.h>
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/protocol/JsonFields.h>
#include <ripple/resource/Fees.h>
#include <ripple/rpc/Context.h>
namespace ripple {
static std::uint32_t constexpr hopLimit = 3;
/** RPC command that reports stored shards by nodes.
{
// Determines if the result includes node public key.
// optional, default is false
pubkey: <bool>
// The maximum number of peer hops to attempt.
// optional, default is zero, maximum is 3
limit: <integer>
}
*/
Json::Value
doCrawlShards(RPC::Context& context)
{
std::uint32_t hops {0};
if (auto const& jv = context.params[jss::limit])
{
if (!(jv.isUInt() || (jv.isInt() && jv.asInt() >= 0)))
{
return RPC::expected_field_error(
jss::limit, "unsigned integer");
}
hops = std::min(jv.asUInt(), hopLimit);
}
bool const pubKey {context.params.isMember(jss::public_key) &&
context.params[jss::public_key].asBool()};
// Collect shard info from peers connected to this server
Json::Value jvResult {context.app.overlay().crawlShards(pubKey, hops)};
// Collect shard info from this server
if (auto shardStore = context.app.getShardStore())
{
if (pubKey)
jvResult[jss::public_key] = toBase58(
TokenType::NodePublic, context.app.nodeIdentity().first);
jvResult[jss::complete_shards] = std::move(
shardStore->getCompleteShards());
}
if (hops == 0)
context.loadType = Resource::feeMediumBurdenRPC;
else
context.loadType = Resource::feeHighBurdenRPC;
return jvResult;
}
} // ripple

View File

@@ -69,6 +69,7 @@ Json::Value doServerInfo (RPC::Context&); // for humans
Json::Value doServerState (RPC::Context&); // for machines
Json::Value doSign (RPC::Context&);
Json::Value doSignFor (RPC::Context&);
Json::Value doCrawlShards (RPC::Context&);
Json::Value doStop (RPC::Context&);
Json::Value doSubmit (RPC::Context&);
Json::Value doSubmitMultiSigned (RPC::Context&);

View File

@@ -104,6 +104,7 @@ Handler const handlerArray[] {
{ "submit_multisigned", byRef (&doSubmitMultiSigned), Role::USER, NEEDS_CURRENT_LEDGER },
{ "server_info", byRef (&doServerInfo), Role::USER, NO_CONDITION },
{ "server_state", byRef (&doServerState), Role::USER, NO_CONDITION },
{ "crawl_shards", byRef (&doCrawlShards), Role::USER, NO_CONDITION },
{ "stop", byRef (&doStop), Role::ADMIN, NO_CONDITION },
{ "transaction_entry", byRef (&doTransactionEntry), Role::USER, NO_CONDITION },
{ "tx", byRef (&doTx), Role::USER, NEEDS_NETWORK_CONNECTION },

View File

@@ -36,6 +36,7 @@
#include <ripple/rpc/handlers/ServerState.cpp>
#include <ripple/rpc/handlers/SignFor.cpp>
#include <ripple/rpc/handlers/SignHandler.cpp>
#include <ripple/rpc/handlers/CrawlShards.cpp>
#include <ripple/rpc/handlers/Stop.cpp>
#include <ripple/rpc/handlers/Submit.cpp>
#include <ripple/rpc/handlers/SubmitMultiSigned.cpp>