From 17e0d098a010ba5738132b280e9f789d0a912d96 Mon Sep 17 00:00:00 2001 From: Miguel Portilla Date: Tue, 10 Jul 2018 11:11:29 -0400 Subject: [PATCH] Add RPC command shard crawl (RIPD-1663) --- src/ripple/app/consensus/RCLConsensus.cpp | 6 - src/ripple/net/impl/RPCCall.cpp | 1 + .../nodestore/impl/DatabaseShardImp.cpp | 10 + src/ripple/overlay/Overlay.h | 9 + src/ripple/overlay/Peer.h | 1 - src/ripple/overlay/impl/OverlayImpl.cpp | 106 +++++++- src/ripple/overlay/impl/OverlayImpl.h | 34 ++- src/ripple/overlay/impl/PeerImp.cpp | 236 ++++++++++++++---- src/ripple/overlay/impl/PeerImp.h | 31 ++- src/ripple/overlay/impl/ProtocolMessage.h | 4 + src/ripple/overlay/impl/TrafficCount.cpp | 2 + src/ripple/proto/ripple.proto | 22 +- src/ripple/rpc/handlers/CrawlShards.cpp | 82 ++++++ src/ripple/rpc/handlers/Handlers.h | 1 + src/ripple/rpc/impl/Handler.cpp | 1 + src/ripple/unity/rpcx2.cpp | 1 + 16 files changed, 473 insertions(+), 74 deletions(-) create mode 100644 src/ripple/rpc/handlers/CrawlShards.cpp diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp index b94dc6ee88..cfe8d480c2 100644 --- a/src/ripple/app/consensus/RCLConsensus.cpp +++ b/src/ripple/app/consensus/RCLConsensus.cpp @@ -628,12 +628,6 @@ RCLConsensus::Adaptor::notify( } s.set_firstseq(uMin); s.set_lastseq(uMax); - if (auto shardStore = app_.getShardStore()) - { - auto shards = shardStore->getCompleteShards(); - if (! shards.empty()) - s.set_shardseqs(shards); - } app_.overlay ().foreach (send_always ( std::make_shared ( s, protocol::mtSTATUS_CHANGE))); diff --git a/src/ripple/net/impl/RPCCall.cpp b/src/ripple/net/impl/RPCCall.cpp index 82fa88893a..79fad9a160 100644 --- a/src/ripple/net/impl/RPCCall.cpp +++ b/src/ripple/net/impl/RPCCall.cpp @@ -1146,6 +1146,7 @@ public: { "submit_multisigned", &RPCParser::parseSubmitMultiSigned, 1, 1 }, { "server_info", &RPCParser::parseServerInfo, 0, 1 }, { "server_state", &RPCParser::parseServerInfo, 0, 1 }, + { "shards", &RPCParser::parseAsIs, 0, 0 }, { "stop", &RPCParser::parseAsIs, 0, 0 }, { "transaction_entry", &RPCParser::parseTransactionEntry, 2, 2 }, { "tx", &RPCParser::parseTx, 1, 2 }, diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 37daad48fe..ce83698a93 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -25,6 +25,8 @@ #include #include #include +#include +#include #include namespace ripple { @@ -513,6 +515,14 @@ DatabaseShardImp::setStored(std::shared_ptr const& ledger) complete_.emplace(incomplete_->index(), std::move(incomplete_)); incomplete_.reset(); updateStats(l); + + // Update peers with new shard index + protocol::TMShardInfo message; + auto const& publicKey {app_.nodeIdentity().first}; + message.set_nodepubkey(publicKey.data(), publicKey.size()); + message.set_shardindexes(std::to_string(shardIndex)); + app_.overlay().foreach(send_always( + std::make_shared(message, protocol::mtSHARD_INFO))); } } diff --git a/src/ripple/overlay/Overlay.h b/src/ripple/overlay/Overlay.h index 1a2b0ad5f8..8af0ab7bd0 100644 --- a/src/ripple/overlay/Overlay.h +++ b/src/ripple/overlay/Overlay.h @@ -241,6 +241,15 @@ public: virtual std::uint64_t getPeerDisconnect() const = 0; virtual void incPeerDisconnectCharges() = 0; virtual std::uint64_t getPeerDisconnectCharges() const = 0; + + /** Returns information reported to the crawl shard RPC command. + + @param hops the maximum jumps the crawler will attempt. + The number of hops achieved is not guaranteed. + */ + virtual + Json::Value + crawlShards(bool pubKey, std::uint32_t hops) = 0; }; struct ScoreHasLedger diff --git a/src/ripple/overlay/Peer.h b/src/ripple/overlay/Peer.h index 7ef03a49d8..64288055c9 100644 --- a/src/ripple/overlay/Peer.h +++ b/src/ripple/overlay/Peer.h @@ -100,7 +100,6 @@ public: virtual bool hasLedger (uint256 const& hash, std::uint32_t seq) const = 0; virtual void ledgerRange (std::uint32_t& minSeq, std::uint32_t& maxSeq) const = 0; virtual bool hasShard (std::uint32_t shardIndex) const = 0; - virtual std::string getShards() const = 0; virtual bool hasTxSet (uint256 const& hash) const = 0; virtual void cycleStatus () = 0; virtual bool supportsVersion (int version) = 0; diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index a56c08f88b..add6a82d4f 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -708,6 +708,98 @@ OverlayImpl::reportTraffic ( m_traffic.addCount (cat, isInbound, number); } +Json::Value +OverlayImpl::crawlShards(bool pubKey, std::uint32_t hops) +{ + using namespace std::chrono; + using namespace std::chrono_literals; + + Json::Value jv(Json::objectValue); + auto const numPeers {size()}; + if (numPeers == 0) + return jv; + + // If greater than a hop away, we may need to gather or freshen data + if (hops > 0) + { + // Prevent crawl spamming + clock_type::time_point const last(csLast_.load()); + if (duration_cast(clock_type::now() - last) > 60s) + { + auto const timeout(seconds((hops * hops) * 10)); + std::unique_lock l {csMutex_}; + + // Check if already requested + if (csIDs_.empty()) + { + { + std::lock_guard lock {mutex_}; + for (auto& id : ids_) + csIDs_.emplace(id.first); + } + + // Relay request to active peers + protocol::TMGetShardInfo tmGS; + tmGS.set_hops(hops); + foreach(send_always(std::make_shared( + tmGS, protocol::mtGET_SHARD_INFO))); + + if (csCV_.wait_for(l, timeout) == std::cv_status::timeout) + { + csIDs_.clear(); + csCV_.notify_all(); + } + csLast_ = duration_cast( + clock_type::now().time_since_epoch()); + } + else + csCV_.wait_for(l, timeout); + } + } + + // Combine the shard info from peers and their sub peers + hash_map peerShardInfo; + for_each([&](std::shared_ptr&& peer) + { + if (auto psi = peer->getPeerShardInfo()) + { + for (auto const& e : *psi) + { + auto it {peerShardInfo.find(e.first)}; + if (it != peerShardInfo.end()) + // The key exists so join the shard indexes. + it->second.shardIndexes += e.second.shardIndexes; + else + peerShardInfo.emplace(std::move(e)); + } + } + }); + + // Prepare json reply + auto& av = jv[jss::peers] = Json::Value(Json::arrayValue); + for (auto const& e : peerShardInfo) + { + auto& pv {av.append(Json::Value(Json::objectValue))}; + if (pubKey) + pv[jss::public_key] = toBase58(TokenType::NodePublic, e.first); + pv[jss::ip] = e.second.endpoint.address().to_string(); + pv[jss::complete_shards] = to_string(e.second.shardIndexes); + } + + return jv; +} + +void +OverlayImpl::lastLink(std::uint32_t id) +{ + // Notify threads when every peer has received a last link. + // This doesn't account for every node that might reply but + // it is adequate. + std::lock_guard l {csMutex_}; + if (csIDs_.erase(id) && csIDs_.empty()) + csCV_.notify_all(); +} + std::size_t OverlayImpl::selectPeers (PeerSet& set, std::size_t limit, std::function const&)> score) @@ -787,9 +879,12 @@ OverlayImpl::crawl() sp->getRemoteAddress().port()); } } - auto version = sp->getVersion (); - if (! version.empty ()) - pv[jss::version] = version; + + { + auto version {sp->getVersion()}; + if (!version.empty()) + pv[jss::version] = std::move(version); + } std::uint32_t minSeq, maxSeq; sp->ledgerRange(minSeq, maxSeq); @@ -798,9 +893,8 @@ OverlayImpl::crawl() std::to_string(minSeq) + "-" + std::to_string(maxSeq); - auto shards = sp->getShards(); - if (! shards.empty()) - pv[jss::complete_shards] = shards; + if (auto shardIndexes = sp->getShardIndexes()) + pv[jss::complete_shards] = to_string(shardIndexes); }); return jv; diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 1e41b916c9..7d83a5e40d 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -123,6 +123,13 @@ private: std::atomic peerDisconnects_ {0}; std::atomic peerDisconnectsCharges_ {0}; + // Last time we crawled peers for shard info + std::atomic csLast_{std::chrono::seconds{0}}; + std::mutex csMutex_; + std::condition_variable csCV_; + // Peer IDs expecting to receive a last link notification + std::set csIDs_; + //-------------------------------------------------------------------------- public: @@ -221,15 +228,17 @@ public: void for_each (UnaryFunc&& f) { - std::lock_guard lock (mutex_); - - // Iterate over a copy of the peer list because peer - // destruction can invalidate iterators. std::vector> wp; - wp.reserve(ids_.size()); + { + std::lock_guard lock(mutex_); - for (auto& x : ids_) - wp.push_back(x.second); + // Iterate over a copy of the peer list because peer + // destruction can invalidate iterators. + wp.reserve(ids_.size()); + + for (auto& x : ids_) + wp.push_back(x.second); + } for (auto& w : wp) { @@ -340,6 +349,17 @@ public: return peerDisconnectsCharges_; } + Json::Value + crawlShards(bool pubKey, std::uint32_t hops) override; + + + /** Called when the last link from a peer chain is received. + + @param id peer id that received the shard info. + */ + void + lastLink(std::uint32_t id); + private: std::shared_ptr makeRedirectResponse (PeerFinder::Slot::ptr const& slot, diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 230f88beca..b838246fc0 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include @@ -144,6 +145,11 @@ PeerImp::run() doProtocolStart(); } + // Request shard info from peer + protocol::TMGetShardInfo tmGS; + tmGS.set_hops(0); + send(std::make_shared(tmGS, protocol::mtGET_SHARD_INFO)); + setTimer(); } @@ -360,16 +366,20 @@ PeerImp::json() bool PeerImp::hasLedger (uint256 const& hash, std::uint32_t seq) const { - std::lock_guard sl(recentLock_); - if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) && - (sanity_.load() == Sanity::sane)) - return true; - if (std::find(recentLedgers_.begin(), - recentLedgers_.end(), hash) != recentLedgers_.end()) - return true; - return seq >= app_.getNodeStore().earliestSeq() && - boost::icl::contains(shards_, + { + std::lock_guard sl(recentLock_); + if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) && + (sanity_.load() == Sanity::sane)) + return true; + if (std::find(recentLedgers_.begin(), + recentLedgers_.end(), hash) != recentLedgers_.end()) + return true; + } + + if (seq >= app_.getNodeStore().earliestSeq()) + return hasShard( (seq - 1) / NodeStore::DatabaseShard::ledgersPerShardDefault); + return false; } void @@ -385,19 +395,11 @@ PeerImp::ledgerRange (std::uint32_t& minSeq, bool PeerImp::hasShard (std::uint32_t shardIndex) const { - std::lock_guard sl(recentLock_); - return boost::icl::contains(shards_, shardIndex); -} - -std::string -PeerImp::getShards () const -{ - { - std::lock_guard sl(recentLock_); - if (!shards_.empty()) - return to_string(shards_); - } - return {}; + std::lock_guard l {shardInfoMutex_}; + auto it {shardInfo_.find(publicKey_)}; + if (it != shardInfo_.end()) + return boost::icl::contains(it->second.shardIndexes, shardIndex); + return false; } bool @@ -478,6 +480,25 @@ PeerImp::fail(std::string const& name, error_code ec) close(); } +boost::optional> +PeerImp::getShardIndexes() const +{ + std::lock_guard l {shardInfoMutex_}; + auto it{shardInfo_.find(publicKey_)}; + if (it != shardInfo_.end()) + return it->second.shardIndexes; + return boost::none; +} + +boost::optional> +PeerImp::getPeerShardInfo() const +{ + std::lock_guard l {shardInfoMutex_}; + if (!shardInfo_.empty()) + return shardInfo_; + return boost::none; +} + void PeerImp::gracefulClose() { @@ -995,6 +1016,154 @@ PeerImp::onMessage (std::shared_ptr const& m) app_.getFeeTrack().setClusterFee(clusterFee); } +void +PeerImp::onMessage (std::shared_ptr const& m) +{ + fee_ = Resource::feeMediumBurdenPeer; + + // Reply with shard info we may have + if (auto shardStore = app_.getShardStore()) + { + auto shards {shardStore->getCompleteShards()}; + if (!shards.empty()) + { + protocol::TMShardInfo reply; + auto const& publicKey {app_.nodeIdentity().first}; + reply.set_nodepubkey(publicKey.data(), publicKey.size()); + reply.set_shardindexes(shards); + + if (m->has_lastlink()) + reply.set_lastlink(true); + + if (m->peerchain_size() > 0) + *reply.mutable_peerchain() = m->peerchain(); + + send(std::make_shared(reply, protocol::mtSHARD_INFO)); + + JLOG(p_journal_.trace()) << + "Sent shard info to peer with IP " << + remote_address_.address().to_string() << + " public key " << toBase58(TokenType::NodePublic, publicKey) << + " shard indexes " << shards; + } + } + + // Relay request to peers + if (m->hops() > 0) + { + m->set_hops(m->hops() - 1); + if (m->hops() == 0) + m->set_lastlink(true); + + m->add_peerchain(id()); + overlay_.foreach(send_if_not( + std::make_shared(*m, protocol::mtGET_SHARD_INFO), + match_peer(this))); + } +} + +void +PeerImp::onMessage(std::shared_ptr const& m) +{ + // Check if the message should be forwarded to another peer + if (m->peerchain_size() > 0) + { + auto const peerId {m->peerchain(m->peerchain_size() - 1)}; + if (auto peer = overlay_.findPeerByShortID(peerId)) + { + if (!m->has_endpoint()) + m->set_endpoint(remote_address_.address().to_string()); + + m->mutable_peerchain()->RemoveLast(); + peer->send(std::make_shared(*m, protocol::mtSHARD_INFO)); + + JLOG(p_journal_.trace()) << + "Relayed TMShardInfo to peer with IP " << + remote_address_.address().to_string(); + } + else + { + // Peer is no longer available so the relay ends + JLOG(p_journal_.info()) << + "Unable to route shard info"; + fee_ = Resource::feeUnwantedData; + } + return; + } + + // Consume the shard info received + if (m->shardindexes().empty()) + { + JLOG(p_journal_.error()) << + "Node response missing shard indexes"; + return; + } + + // Get the IP of the node reporting the shard info + beast::IP::Endpoint address; + if (m->has_endpoint()) + { + auto result {beast::IP::Endpoint::from_string_checked(m->endpoint())}; + if (!result.second) + { + JLOG(p_journal_.error()) << + "failed to parse incoming endpoint: {" << + m->endpoint() << "}"; + return; + } + address = std::move(result.first); + } + else + address = remote_address_; + + RangeSet* shardIndexes {nullptr}; + PublicKey const publicKey(makeSlice(m->nodepubkey())); + + std::lock_guard l {shardInfoMutex_}; + auto it {shardInfo_.find(publicKey)}; + if (it != shardInfo_.end()) + { + // Update the IP address for the node + it->second.endpoint = address; + + // Update the shard indexes held by the node + shardIndexes = &(it->second.shardIndexes); + } + else + { + // Add a new node + ShardInfo shardInfo; + shardInfo.endpoint = address; + shardIndexes = &(shardInfo_.emplace(std::move(publicKey), + std::move(shardInfo)).first->second.shardIndexes); + } + + // Parse shard indexes + std::vector tokens; + boost::split(tokens, m->shardindexes(), boost::algorithm::is_any_of(",")); + for (auto const& t : tokens) + { + std::vector seqs; + boost::split(seqs, t, boost::algorithm::is_any_of("-")); + if (seqs.size() == 1) + shardIndexes->insert( + beast::lexicalCastThrow(seqs.front())); + else if (seqs.size() == 2) + shardIndexes->insert(range( + beast::lexicalCastThrow(seqs.front()), + beast::lexicalCastThrow(seqs.back()))); + } + + JLOG(p_journal_.trace()) << + "Consumed TMShardInfo originating from peer with IP " << + address.address().to_string() << + " public key " << toBase58(TokenType::NodePublic, publicKey) << + " shard indexes " << to_string(*shardIndexes); + + if (m->has_lastlink()) + overlay_.lastLink(id_); +} + void PeerImp::onMessage (std::shared_ptr const& m) { @@ -1414,26 +1583,6 @@ PeerImp::onMessage (std::shared_ptr const& m) minLedger_ = 0; } - if (m->has_shardseqs()) - { - std::vector tokens; - boost::split(tokens, m->shardseqs(), boost::algorithm::is_any_of(",")); - std::lock_guard sl(recentLock_); - shards_.clear(); - for (auto const& t : tokens) - { - std::vector seqs; - boost::split(seqs, t, boost::algorithm::is_any_of("-")); - if (seqs.size() == 1) - shards_.insert( - beast::lexicalCastThrow(seqs.front())); - else if (seqs.size() == 2) - shards_.insert(range( - beast::lexicalCastThrow(seqs.front()), - beast::lexicalCastThrow(seqs.back()))); - } - } - if (m->has_ledgerseq() && app_.getLedgerMaster().getValidatedLedgerAge() < 2min) { @@ -1509,9 +1658,6 @@ PeerImp::onMessage (std::shared_ptr const& m) Json::UInt (m->lastseq ()); } - if (m->has_shardseqs()) - j[jss::complete_shards] = m->shardseqs(); - return j; }); } diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index ac8888f67c..65a05bc7da 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -22,20 +22,23 @@ #include #include -#include #include +#include #include #include +#include #include #include #include #include #include +#include #include #include #include + namespace ripple { class PeerImp @@ -77,6 +80,12 @@ public: ,sane }; + struct ShardInfo + { + beast::IP::Endpoint endpoint; + RangeSet shardIndexes; + }; + using ptr = std::shared_ptr ; private: @@ -107,7 +116,7 @@ private: // Updated at each stage of the connection process to reflect // the current conditions as closely as possible. - beast::IP::Endpoint remote_address_; + beast::IP::Endpoint const remote_address_; // These are up here to prevent warnings about order of initializations // @@ -126,7 +135,6 @@ private: // LedgerIndex minLedger_ = 0; LedgerIndex maxLedger_ = 0; - RangeSet shards_; uint256 closedLedgerHash_; uint256 previousLedgerHash_; std::deque recentLedgers_; @@ -155,6 +163,9 @@ private: std::unique_ptr load_event_; bool hopsAware_ = false; + std::mutex mutable shardInfoMutex_; + hash_map shardInfo_; + friend class OverlayImpl; public: @@ -235,6 +246,7 @@ public: return id_; } + /** Returns `true` if this connection will publicly share its IP address. */ bool crawl() const; @@ -301,9 +313,6 @@ public: bool hasShard (std::uint32_t shardIndex) const override; - std::string - getShards () const override; - bool hasTxSet (uint256 const& hash) const override; @@ -326,6 +335,14 @@ public: void fail(std::string const& reason); + /** Return a range set of known shard indexes from this peer. */ + boost::optional> + getShardIndexes() const; + + /** Return any known shard info from this peer and its sub peers. */ + boost::optional> + getPeerShardInfo() const; + private: void close(); @@ -412,6 +429,8 @@ public: void onMessage (std::shared_ptr const& m); void onMessage (std::shared_ptr const& m); void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); void onMessage (std::shared_ptr const& m); void onMessage (std::shared_ptr const& m); void onMessage (std::shared_ptr const& m); diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h index f4c093d56f..aab8327946 100644 --- a/src/ripple/overlay/impl/ProtocolMessage.h +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -46,6 +46,8 @@ protocolMessageName (int type) case protocol::mtPING: return "ping"; case protocol::mtPROOFOFWORK: return "proof_of_work"; case protocol::mtCLUSTER: return "cluster"; + case protocol::mtGET_SHARD_INFO: return "get_shard_info"; + case protocol::mtSHARD_INFO: return "shard_info"; case protocol::mtGET_PEERS: return "get_peers"; case protocol::mtPEERS: return "peers"; case protocol::mtENDPOINTS: return "endpoints"; @@ -117,6 +119,8 @@ invokeProtocolMessage (Buffers const& buffers, Handler& handler) case protocol::mtMANIFESTS: ec = detail::invoke (type, buffers, handler); break; case protocol::mtPING: ec = detail::invoke (type, buffers, handler); break; case protocol::mtCLUSTER: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtGET_SHARD_INFO:ec = detail::invoke (type, buffers, handler); break; + case protocol::mtSHARD_INFO: ec = detail::invoke(type, buffers, handler); break; case protocol::mtGET_PEERS: ec = detail::invoke (type, buffers, handler); break; case protocol::mtPEERS: ec = detail::invoke (type, buffers, handler); break; case protocol::mtENDPOINTS: ec = detail::invoke (type, buffers, handler); break; diff --git a/src/ripple/overlay/impl/TrafficCount.cpp b/src/ripple/overlay/impl/TrafficCount.cpp index 40815d748e..9e9158ffd3 100644 --- a/src/ripple/overlay/impl/TrafficCount.cpp +++ b/src/ripple/overlay/impl/TrafficCount.cpp @@ -64,6 +64,8 @@ TrafficCount::category TrafficCount::categorize ( if ((type == protocol::mtMANIFESTS) || (type == protocol::mtENDPOINTS) || + (type == protocol::mtGET_SHARD_INFO) || + (type == protocol::mtSHARD_INFO) || (type == protocol::mtPEERS) || (type == protocol::mtGET_PEERS)) return TrafficCount::category::CT_overlay; diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index 4f8b9b1172..c7bd14aea7 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -8,6 +8,8 @@ enum MessageType mtPING = 3; mtPROOFOFWORK = 4; mtCLUSTER = 5; + mtGET_SHARD_INFO = 10; + mtSHARD_INFO = 11; mtGET_PEERS = 12; mtPEERS = 13; mtENDPOINTS = 15; @@ -20,8 +22,6 @@ enum MessageType mtVALIDATION = 41; mtGET_OBJECTS = 42; - // = 10; - // = 11; // = 14; // = 20; // = 21; @@ -127,6 +127,23 @@ message TMCluster repeated TMLoadSource loadSources = 2; } +// Request info on shards held +message TMGetShardInfo +{ + required uint32 hops = 1; // number of hops to travel + optional bool lastLink = 2; // true if last link in the peer chain + repeated uint32 peerchain = 3; +} + +// Info about shards held +message TMShardInfo +{ + required bytes nodePubKey = 1; + required string shardIndexes = 2; // rangeSet of shard indexes + optional string endpoint = 3; // ipv6 or ipv4 address + optional bool lastLink = 4; // true if last link in the peer chain + repeated uint32 peerchain = 5; +} // A transaction can have only one input and one output. // If you want to send an amount that is greater than any single address of yours @@ -180,7 +197,6 @@ message TMStatusChange optional uint64 networkTime = 6; optional uint32 firstSeq = 7; optional uint32 lastSeq = 8; - optional string shardSeqs = 9; } diff --git a/src/ripple/rpc/handlers/CrawlShards.cpp b/src/ripple/rpc/handlers/CrawlShards.cpp new file mode 100644 index 0000000000..d133ac2bb9 --- /dev/null +++ b/src/ripple/rpc/handlers/CrawlShards.cpp @@ -0,0 +1,82 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2018 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include +#include +#include + +namespace ripple { + +static std::uint32_t constexpr hopLimit = 3; + +/** RPC command that reports stored shards by nodes. + { + // Determines if the result includes node public key. + // optional, default is false + pubkey: + + // The maximum number of peer hops to attempt. + // optional, default is zero, maximum is 3 + limit: + } +*/ +Json::Value +doCrawlShards(RPC::Context& context) +{ + std::uint32_t hops {0}; + if (auto const& jv = context.params[jss::limit]) + { + if (!(jv.isUInt() || (jv.isInt() && jv.asInt() >= 0))) + { + return RPC::expected_field_error( + jss::limit, "unsigned integer"); + } + + hops = std::min(jv.asUInt(), hopLimit); + } + + bool const pubKey {context.params.isMember(jss::public_key) && + context.params[jss::public_key].asBool()}; + + // Collect shard info from peers connected to this server + Json::Value jvResult {context.app.overlay().crawlShards(pubKey, hops)}; + + // Collect shard info from this server + if (auto shardStore = context.app.getShardStore()) + { + if (pubKey) + jvResult[jss::public_key] = toBase58( + TokenType::NodePublic, context.app.nodeIdentity().first); + jvResult[jss::complete_shards] = std::move( + shardStore->getCompleteShards()); + } + + if (hops == 0) + context.loadType = Resource::feeMediumBurdenRPC; + else + context.loadType = Resource::feeHighBurdenRPC; + + return jvResult; +} + +} // ripple diff --git a/src/ripple/rpc/handlers/Handlers.h b/src/ripple/rpc/handlers/Handlers.h index b5f82bc654..d938fd6afc 100644 --- a/src/ripple/rpc/handlers/Handlers.h +++ b/src/ripple/rpc/handlers/Handlers.h @@ -69,6 +69,7 @@ Json::Value doServerInfo (RPC::Context&); // for humans Json::Value doServerState (RPC::Context&); // for machines Json::Value doSign (RPC::Context&); Json::Value doSignFor (RPC::Context&); +Json::Value doCrawlShards (RPC::Context&); Json::Value doStop (RPC::Context&); Json::Value doSubmit (RPC::Context&); Json::Value doSubmitMultiSigned (RPC::Context&); diff --git a/src/ripple/rpc/impl/Handler.cpp b/src/ripple/rpc/impl/Handler.cpp index f6e38778e3..72ab0bb75b 100644 --- a/src/ripple/rpc/impl/Handler.cpp +++ b/src/ripple/rpc/impl/Handler.cpp @@ -104,6 +104,7 @@ Handler const handlerArray[] { { "submit_multisigned", byRef (&doSubmitMultiSigned), Role::USER, NEEDS_CURRENT_LEDGER }, { "server_info", byRef (&doServerInfo), Role::USER, NO_CONDITION }, { "server_state", byRef (&doServerState), Role::USER, NO_CONDITION }, + { "crawl_shards", byRef (&doCrawlShards), Role::USER, NO_CONDITION }, { "stop", byRef (&doStop), Role::ADMIN, NO_CONDITION }, { "transaction_entry", byRef (&doTransactionEntry), Role::USER, NO_CONDITION }, { "tx", byRef (&doTx), Role::USER, NEEDS_NETWORK_CONNECTION }, diff --git a/src/ripple/unity/rpcx2.cpp b/src/ripple/unity/rpcx2.cpp index 37f82dbc54..1c36a40957 100644 --- a/src/ripple/unity/rpcx2.cpp +++ b/src/ripple/unity/rpcx2.cpp @@ -36,6 +36,7 @@ #include #include #include +#include #include #include #include