mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-20 02:55:50 +00:00
Add RPC command shard crawl (RIPD-1663)
This commit is contained in:
@@ -628,12 +628,6 @@ RCLConsensus::Adaptor::notify(
|
||||
}
|
||||
s.set_firstseq(uMin);
|
||||
s.set_lastseq(uMax);
|
||||
if (auto shardStore = app_.getShardStore())
|
||||
{
|
||||
auto shards = shardStore->getCompleteShards();
|
||||
if (! shards.empty())
|
||||
s.set_shardseqs(shards);
|
||||
}
|
||||
app_.overlay ().foreach (send_always (
|
||||
std::make_shared <Message> (
|
||||
s, protocol::mtSTATUS_CHANGE)));
|
||||
|
||||
@@ -1146,6 +1146,7 @@ public:
|
||||
{ "submit_multisigned", &RPCParser::parseSubmitMultiSigned, 1, 1 },
|
||||
{ "server_info", &RPCParser::parseServerInfo, 0, 1 },
|
||||
{ "server_state", &RPCParser::parseServerInfo, 0, 1 },
|
||||
{ "shards", &RPCParser::parseAsIs, 0, 0 },
|
||||
{ "stop", &RPCParser::parseAsIs, 0, 0 },
|
||||
{ "transaction_entry", &RPCParser::parseTransactionEntry, 2, 2 },
|
||||
{ "tx", &RPCParser::parseTx, 1, 2 },
|
||||
|
||||
@@ -25,6 +25,8 @@
|
||||
#include <ripple/basics/random.h>
|
||||
#include <ripple/nodestore/DummyScheduler.h>
|
||||
#include <ripple/nodestore/Manager.h>
|
||||
#include <ripple/overlay/Overlay.h>
|
||||
#include <ripple/overlay/predicates.h>
|
||||
#include <ripple/protocol/HashPrefix.h>
|
||||
|
||||
namespace ripple {
|
||||
@@ -513,6 +515,14 @@ DatabaseShardImp::setStored(std::shared_ptr<Ledger const> const& ledger)
|
||||
complete_.emplace(incomplete_->index(), std::move(incomplete_));
|
||||
incomplete_.reset();
|
||||
updateStats(l);
|
||||
|
||||
// Update peers with new shard index
|
||||
protocol::TMShardInfo message;
|
||||
auto const& publicKey {app_.nodeIdentity().first};
|
||||
message.set_nodepubkey(publicKey.data(), publicKey.size());
|
||||
message.set_shardindexes(std::to_string(shardIndex));
|
||||
app_.overlay().foreach(send_always(
|
||||
std::make_shared<Message>(message, protocol::mtSHARD_INFO)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -241,6 +241,15 @@ public:
|
||||
virtual std::uint64_t getPeerDisconnect() const = 0;
|
||||
virtual void incPeerDisconnectCharges() = 0;
|
||||
virtual std::uint64_t getPeerDisconnectCharges() const = 0;
|
||||
|
||||
/** Returns information reported to the crawl shard RPC command.
|
||||
|
||||
@param hops the maximum jumps the crawler will attempt.
|
||||
The number of hops achieved is not guaranteed.
|
||||
*/
|
||||
virtual
|
||||
Json::Value
|
||||
crawlShards(bool pubKey, std::uint32_t hops) = 0;
|
||||
};
|
||||
|
||||
struct ScoreHasLedger
|
||||
|
||||
@@ -100,7 +100,6 @@ public:
|
||||
virtual bool hasLedger (uint256 const& hash, std::uint32_t seq) const = 0;
|
||||
virtual void ledgerRange (std::uint32_t& minSeq, std::uint32_t& maxSeq) const = 0;
|
||||
virtual bool hasShard (std::uint32_t shardIndex) const = 0;
|
||||
virtual std::string getShards() const = 0;
|
||||
virtual bool hasTxSet (uint256 const& hash) const = 0;
|
||||
virtual void cycleStatus () = 0;
|
||||
virtual bool supportsVersion (int version) = 0;
|
||||
|
||||
@@ -708,6 +708,98 @@ OverlayImpl::reportTraffic (
|
||||
m_traffic.addCount (cat, isInbound, number);
|
||||
}
|
||||
|
||||
Json::Value
|
||||
OverlayImpl::crawlShards(bool pubKey, std::uint32_t hops)
|
||||
{
|
||||
using namespace std::chrono;
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
Json::Value jv(Json::objectValue);
|
||||
auto const numPeers {size()};
|
||||
if (numPeers == 0)
|
||||
return jv;
|
||||
|
||||
// If greater than a hop away, we may need to gather or freshen data
|
||||
if (hops > 0)
|
||||
{
|
||||
// Prevent crawl spamming
|
||||
clock_type::time_point const last(csLast_.load());
|
||||
if (duration_cast<seconds>(clock_type::now() - last) > 60s)
|
||||
{
|
||||
auto const timeout(seconds((hops * hops) * 10));
|
||||
std::unique_lock<std::mutex> l {csMutex_};
|
||||
|
||||
// Check if already requested
|
||||
if (csIDs_.empty())
|
||||
{
|
||||
{
|
||||
std::lock_guard <decltype(mutex_)> lock {mutex_};
|
||||
for (auto& id : ids_)
|
||||
csIDs_.emplace(id.first);
|
||||
}
|
||||
|
||||
// Relay request to active peers
|
||||
protocol::TMGetShardInfo tmGS;
|
||||
tmGS.set_hops(hops);
|
||||
foreach(send_always(std::make_shared<Message>(
|
||||
tmGS, protocol::mtGET_SHARD_INFO)));
|
||||
|
||||
if (csCV_.wait_for(l, timeout) == std::cv_status::timeout)
|
||||
{
|
||||
csIDs_.clear();
|
||||
csCV_.notify_all();
|
||||
}
|
||||
csLast_ = duration_cast<seconds>(
|
||||
clock_type::now().time_since_epoch());
|
||||
}
|
||||
else
|
||||
csCV_.wait_for(l, timeout);
|
||||
}
|
||||
}
|
||||
|
||||
// Combine the shard info from peers and their sub peers
|
||||
hash_map<PublicKey, PeerImp::ShardInfo> peerShardInfo;
|
||||
for_each([&](std::shared_ptr<PeerImp>&& peer)
|
||||
{
|
||||
if (auto psi = peer->getPeerShardInfo())
|
||||
{
|
||||
for (auto const& e : *psi)
|
||||
{
|
||||
auto it {peerShardInfo.find(e.first)};
|
||||
if (it != peerShardInfo.end())
|
||||
// The key exists so join the shard indexes.
|
||||
it->second.shardIndexes += e.second.shardIndexes;
|
||||
else
|
||||
peerShardInfo.emplace(std::move(e));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Prepare json reply
|
||||
auto& av = jv[jss::peers] = Json::Value(Json::arrayValue);
|
||||
for (auto const& e : peerShardInfo)
|
||||
{
|
||||
auto& pv {av.append(Json::Value(Json::objectValue))};
|
||||
if (pubKey)
|
||||
pv[jss::public_key] = toBase58(TokenType::NodePublic, e.first);
|
||||
pv[jss::ip] = e.second.endpoint.address().to_string();
|
||||
pv[jss::complete_shards] = to_string(e.second.shardIndexes);
|
||||
}
|
||||
|
||||
return jv;
|
||||
}
|
||||
|
||||
void
|
||||
OverlayImpl::lastLink(std::uint32_t id)
|
||||
{
|
||||
// Notify threads when every peer has received a last link.
|
||||
// This doesn't account for every node that might reply but
|
||||
// it is adequate.
|
||||
std::lock_guard<std::mutex> l {csMutex_};
|
||||
if (csIDs_.erase(id) && csIDs_.empty())
|
||||
csCV_.notify_all();
|
||||
}
|
||||
|
||||
std::size_t
|
||||
OverlayImpl::selectPeers (PeerSet& set, std::size_t limit,
|
||||
std::function<bool(std::shared_ptr<Peer> const&)> score)
|
||||
@@ -787,9 +879,12 @@ OverlayImpl::crawl()
|
||||
sp->getRemoteAddress().port());
|
||||
}
|
||||
}
|
||||
auto version = sp->getVersion ();
|
||||
if (! version.empty ())
|
||||
pv[jss::version] = version;
|
||||
|
||||
{
|
||||
auto version {sp->getVersion()};
|
||||
if (!version.empty())
|
||||
pv[jss::version] = std::move(version);
|
||||
}
|
||||
|
||||
std::uint32_t minSeq, maxSeq;
|
||||
sp->ledgerRange(minSeq, maxSeq);
|
||||
@@ -798,9 +893,8 @@ OverlayImpl::crawl()
|
||||
std::to_string(minSeq) + "-" +
|
||||
std::to_string(maxSeq);
|
||||
|
||||
auto shards = sp->getShards();
|
||||
if (! shards.empty())
|
||||
pv[jss::complete_shards] = shards;
|
||||
if (auto shardIndexes = sp->getShardIndexes())
|
||||
pv[jss::complete_shards] = to_string(shardIndexes);
|
||||
});
|
||||
|
||||
return jv;
|
||||
|
||||
@@ -123,6 +123,13 @@ private:
|
||||
std::atomic <uint64_t> peerDisconnects_ {0};
|
||||
std::atomic <uint64_t> peerDisconnectsCharges_ {0};
|
||||
|
||||
// Last time we crawled peers for shard info
|
||||
std::atomic<std::chrono::seconds> csLast_{std::chrono::seconds{0}};
|
||||
std::mutex csMutex_;
|
||||
std::condition_variable csCV_;
|
||||
// Peer IDs expecting to receive a last link notification
|
||||
std::set<std::uint32_t> csIDs_;
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
public:
|
||||
@@ -221,15 +228,17 @@ public:
|
||||
void
|
||||
for_each (UnaryFunc&& f)
|
||||
{
|
||||
std::lock_guard <decltype(mutex_)> lock (mutex_);
|
||||
|
||||
// Iterate over a copy of the peer list because peer
|
||||
// destruction can invalidate iterators.
|
||||
std::vector<std::weak_ptr<PeerImp>> wp;
|
||||
wp.reserve(ids_.size());
|
||||
{
|
||||
std::lock_guard<decltype(mutex_)> lock(mutex_);
|
||||
|
||||
for (auto& x : ids_)
|
||||
wp.push_back(x.second);
|
||||
// Iterate over a copy of the peer list because peer
|
||||
// destruction can invalidate iterators.
|
||||
wp.reserve(ids_.size());
|
||||
|
||||
for (auto& x : ids_)
|
||||
wp.push_back(x.second);
|
||||
}
|
||||
|
||||
for (auto& w : wp)
|
||||
{
|
||||
@@ -340,6 +349,17 @@ public:
|
||||
return peerDisconnectsCharges_;
|
||||
}
|
||||
|
||||
Json::Value
|
||||
crawlShards(bool pubKey, std::uint32_t hops) override;
|
||||
|
||||
|
||||
/** Called when the last link from a peer chain is received.
|
||||
|
||||
@param id peer id that received the shard info.
|
||||
*/
|
||||
void
|
||||
lastLink(std::uint32_t id);
|
||||
|
||||
private:
|
||||
std::shared_ptr<Writer>
|
||||
makeRedirectResponse (PeerFinder::Slot::ptr const& slot,
|
||||
|
||||
@@ -35,6 +35,7 @@
|
||||
#include <ripple/beast/core/SemanticVersion.h>
|
||||
#include <ripple/nodestore/DatabaseShard.h>
|
||||
#include <ripple/overlay/Cluster.h>
|
||||
#include <ripple/overlay/predicates.h>
|
||||
#include <ripple/protocol/digest.h>
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
@@ -144,6 +145,11 @@ PeerImp::run()
|
||||
doProtocolStart();
|
||||
}
|
||||
|
||||
// Request shard info from peer
|
||||
protocol::TMGetShardInfo tmGS;
|
||||
tmGS.set_hops(0);
|
||||
send(std::make_shared<Message>(tmGS, protocol::mtGET_SHARD_INFO));
|
||||
|
||||
setTimer();
|
||||
}
|
||||
|
||||
@@ -360,16 +366,20 @@ PeerImp::json()
|
||||
bool
|
||||
PeerImp::hasLedger (uint256 const& hash, std::uint32_t seq) const
|
||||
{
|
||||
std::lock_guard<std::mutex> sl(recentLock_);
|
||||
if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) &&
|
||||
(sanity_.load() == Sanity::sane))
|
||||
return true;
|
||||
if (std::find(recentLedgers_.begin(),
|
||||
recentLedgers_.end(), hash) != recentLedgers_.end())
|
||||
return true;
|
||||
return seq >= app_.getNodeStore().earliestSeq() &&
|
||||
boost::icl::contains(shards_,
|
||||
{
|
||||
std::lock_guard<std::mutex> sl(recentLock_);
|
||||
if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) &&
|
||||
(sanity_.load() == Sanity::sane))
|
||||
return true;
|
||||
if (std::find(recentLedgers_.begin(),
|
||||
recentLedgers_.end(), hash) != recentLedgers_.end())
|
||||
return true;
|
||||
}
|
||||
|
||||
if (seq >= app_.getNodeStore().earliestSeq())
|
||||
return hasShard(
|
||||
(seq - 1) / NodeStore::DatabaseShard::ledgersPerShardDefault);
|
||||
return false;
|
||||
}
|
||||
|
||||
void
|
||||
@@ -385,19 +395,11 @@ PeerImp::ledgerRange (std::uint32_t& minSeq,
|
||||
bool
|
||||
PeerImp::hasShard (std::uint32_t shardIndex) const
|
||||
{
|
||||
std::lock_guard<std::mutex> sl(recentLock_);
|
||||
return boost::icl::contains(shards_, shardIndex);
|
||||
}
|
||||
|
||||
std::string
|
||||
PeerImp::getShards () const
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> sl(recentLock_);
|
||||
if (!shards_.empty())
|
||||
return to_string(shards_);
|
||||
}
|
||||
return {};
|
||||
std::lock_guard<std::mutex> l {shardInfoMutex_};
|
||||
auto it {shardInfo_.find(publicKey_)};
|
||||
if (it != shardInfo_.end())
|
||||
return boost::icl::contains(it->second.shardIndexes, shardIndex);
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
@@ -478,6 +480,25 @@ PeerImp::fail(std::string const& name, error_code ec)
|
||||
close();
|
||||
}
|
||||
|
||||
boost::optional<RangeSet<std::uint32_t>>
|
||||
PeerImp::getShardIndexes() const
|
||||
{
|
||||
std::lock_guard<std::mutex> l {shardInfoMutex_};
|
||||
auto it{shardInfo_.find(publicKey_)};
|
||||
if (it != shardInfo_.end())
|
||||
return it->second.shardIndexes;
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
boost::optional<hash_map<PublicKey, PeerImp::ShardInfo>>
|
||||
PeerImp::getPeerShardInfo() const
|
||||
{
|
||||
std::lock_guard<std::mutex> l {shardInfoMutex_};
|
||||
if (!shardInfo_.empty())
|
||||
return shardInfo_;
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::gracefulClose()
|
||||
{
|
||||
@@ -995,6 +1016,154 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMCluster> const& m)
|
||||
app_.getFeeTrack().setClusterFee(clusterFee);
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::onMessage (std::shared_ptr <protocol::TMGetShardInfo> const& m)
|
||||
{
|
||||
fee_ = Resource::feeMediumBurdenPeer;
|
||||
|
||||
// Reply with shard info we may have
|
||||
if (auto shardStore = app_.getShardStore())
|
||||
{
|
||||
auto shards {shardStore->getCompleteShards()};
|
||||
if (!shards.empty())
|
||||
{
|
||||
protocol::TMShardInfo reply;
|
||||
auto const& publicKey {app_.nodeIdentity().first};
|
||||
reply.set_nodepubkey(publicKey.data(), publicKey.size());
|
||||
reply.set_shardindexes(shards);
|
||||
|
||||
if (m->has_lastlink())
|
||||
reply.set_lastlink(true);
|
||||
|
||||
if (m->peerchain_size() > 0)
|
||||
*reply.mutable_peerchain() = m->peerchain();
|
||||
|
||||
send(std::make_shared<Message>(reply, protocol::mtSHARD_INFO));
|
||||
|
||||
JLOG(p_journal_.trace()) <<
|
||||
"Sent shard info to peer with IP " <<
|
||||
remote_address_.address().to_string() <<
|
||||
" public key " << toBase58(TokenType::NodePublic, publicKey) <<
|
||||
" shard indexes " << shards;
|
||||
}
|
||||
}
|
||||
|
||||
// Relay request to peers
|
||||
if (m->hops() > 0)
|
||||
{
|
||||
m->set_hops(m->hops() - 1);
|
||||
if (m->hops() == 0)
|
||||
m->set_lastlink(true);
|
||||
|
||||
m->add_peerchain(id());
|
||||
overlay_.foreach(send_if_not(
|
||||
std::make_shared<Message>(*m, protocol::mtGET_SHARD_INFO),
|
||||
match_peer(this)));
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr <protocol::TMShardInfo> const& m)
|
||||
{
|
||||
// Check if the message should be forwarded to another peer
|
||||
if (m->peerchain_size() > 0)
|
||||
{
|
||||
auto const peerId {m->peerchain(m->peerchain_size() - 1)};
|
||||
if (auto peer = overlay_.findPeerByShortID(peerId))
|
||||
{
|
||||
if (!m->has_endpoint())
|
||||
m->set_endpoint(remote_address_.address().to_string());
|
||||
|
||||
m->mutable_peerchain()->RemoveLast();
|
||||
peer->send(std::make_shared<Message>(*m, protocol::mtSHARD_INFO));
|
||||
|
||||
JLOG(p_journal_.trace()) <<
|
||||
"Relayed TMShardInfo to peer with IP " <<
|
||||
remote_address_.address().to_string();
|
||||
}
|
||||
else
|
||||
{
|
||||
// Peer is no longer available so the relay ends
|
||||
JLOG(p_journal_.info()) <<
|
||||
"Unable to route shard info";
|
||||
fee_ = Resource::feeUnwantedData;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Consume the shard info received
|
||||
if (m->shardindexes().empty())
|
||||
{
|
||||
JLOG(p_journal_.error()) <<
|
||||
"Node response missing shard indexes";
|
||||
return;
|
||||
}
|
||||
|
||||
// Get the IP of the node reporting the shard info
|
||||
beast::IP::Endpoint address;
|
||||
if (m->has_endpoint())
|
||||
{
|
||||
auto result {beast::IP::Endpoint::from_string_checked(m->endpoint())};
|
||||
if (!result.second)
|
||||
{
|
||||
JLOG(p_journal_.error()) <<
|
||||
"failed to parse incoming endpoint: {" <<
|
||||
m->endpoint() << "}";
|
||||
return;
|
||||
}
|
||||
address = std::move(result.first);
|
||||
}
|
||||
else
|
||||
address = remote_address_;
|
||||
|
||||
RangeSet<std::uint32_t>* shardIndexes {nullptr};
|
||||
PublicKey const publicKey(makeSlice(m->nodepubkey()));
|
||||
|
||||
std::lock_guard<std::mutex> l {shardInfoMutex_};
|
||||
auto it {shardInfo_.find(publicKey)};
|
||||
if (it != shardInfo_.end())
|
||||
{
|
||||
// Update the IP address for the node
|
||||
it->second.endpoint = address;
|
||||
|
||||
// Update the shard indexes held by the node
|
||||
shardIndexes = &(it->second.shardIndexes);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Add a new node
|
||||
ShardInfo shardInfo;
|
||||
shardInfo.endpoint = address;
|
||||
shardIndexes = &(shardInfo_.emplace(std::move(publicKey),
|
||||
std::move(shardInfo)).first->second.shardIndexes);
|
||||
}
|
||||
|
||||
// Parse shard indexes
|
||||
std::vector<std::string> tokens;
|
||||
boost::split(tokens, m->shardindexes(), boost::algorithm::is_any_of(","));
|
||||
for (auto const& t : tokens)
|
||||
{
|
||||
std::vector<std::string> seqs;
|
||||
boost::split(seqs, t, boost::algorithm::is_any_of("-"));
|
||||
if (seqs.size() == 1)
|
||||
shardIndexes->insert(
|
||||
beast::lexicalCastThrow<std::uint32_t>(seqs.front()));
|
||||
else if (seqs.size() == 2)
|
||||
shardIndexes->insert(range(
|
||||
beast::lexicalCastThrow<std::uint32_t>(seqs.front()),
|
||||
beast::lexicalCastThrow<std::uint32_t>(seqs.back())));
|
||||
}
|
||||
|
||||
JLOG(p_journal_.trace()) <<
|
||||
"Consumed TMShardInfo originating from peer with IP " <<
|
||||
address.address().to_string() <<
|
||||
" public key " << toBase58(TokenType::NodePublic, publicKey) <<
|
||||
" shard indexes " << to_string(*shardIndexes);
|
||||
|
||||
if (m->has_lastlink())
|
||||
overlay_.lastLink(id_);
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::onMessage (std::shared_ptr <protocol::TMGetPeers> const& m)
|
||||
{
|
||||
@@ -1414,26 +1583,6 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMStatusChange> const& m)
|
||||
minLedger_ = 0;
|
||||
}
|
||||
|
||||
if (m->has_shardseqs())
|
||||
{
|
||||
std::vector<std::string> tokens;
|
||||
boost::split(tokens, m->shardseqs(), boost::algorithm::is_any_of(","));
|
||||
std::lock_guard<std::mutex> sl(recentLock_);
|
||||
shards_.clear();
|
||||
for (auto const& t : tokens)
|
||||
{
|
||||
std::vector<std::string> seqs;
|
||||
boost::split(seqs, t, boost::algorithm::is_any_of("-"));
|
||||
if (seqs.size() == 1)
|
||||
shards_.insert(
|
||||
beast::lexicalCastThrow<std::uint32_t>(seqs.front()));
|
||||
else if (seqs.size() == 2)
|
||||
shards_.insert(range(
|
||||
beast::lexicalCastThrow<std::uint32_t>(seqs.front()),
|
||||
beast::lexicalCastThrow<std::uint32_t>(seqs.back())));
|
||||
}
|
||||
}
|
||||
|
||||
if (m->has_ledgerseq() &&
|
||||
app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
|
||||
{
|
||||
@@ -1509,9 +1658,6 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMStatusChange> const& m)
|
||||
Json::UInt (m->lastseq ());
|
||||
}
|
||||
|
||||
if (m->has_shardseqs())
|
||||
j[jss::complete_shards] = m->shardseqs();
|
||||
|
||||
return j;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -22,20 +22,23 @@
|
||||
|
||||
#include <ripple/app/consensus/RCLCxPeerPos.h>
|
||||
#include <ripple/basics/Log.h>
|
||||
#include <ripple/beast/utility/WrappedSink.h>
|
||||
#include <ripple/basics/RangeSet.h>
|
||||
#include <ripple/beast/utility/WrappedSink.h>
|
||||
#include <ripple/overlay/impl/ProtocolMessage.h>
|
||||
#include <ripple/overlay/impl/OverlayImpl.h>
|
||||
#include <ripple/peerfinder/PeerfinderManager.h>
|
||||
#include <ripple/protocol/Protocol.h>
|
||||
#include <ripple/protocol/STTx.h>
|
||||
#include <ripple/protocol/STValidation.h>
|
||||
#include <ripple/resource/Fees.h>
|
||||
|
||||
#include <boost/endian/conversion.hpp>
|
||||
#include <boost/optional.hpp>
|
||||
#include <cstdint>
|
||||
#include <deque>
|
||||
#include <queue>
|
||||
|
||||
|
||||
namespace ripple {
|
||||
|
||||
class PeerImp
|
||||
@@ -77,6 +80,12 @@ public:
|
||||
,sane
|
||||
};
|
||||
|
||||
struct ShardInfo
|
||||
{
|
||||
beast::IP::Endpoint endpoint;
|
||||
RangeSet<std::uint32_t> shardIndexes;
|
||||
};
|
||||
|
||||
using ptr = std::shared_ptr <PeerImp>;
|
||||
|
||||
private:
|
||||
@@ -107,7 +116,7 @@ private:
|
||||
|
||||
// Updated at each stage of the connection process to reflect
|
||||
// the current conditions as closely as possible.
|
||||
beast::IP::Endpoint remote_address_;
|
||||
beast::IP::Endpoint const remote_address_;
|
||||
|
||||
// These are up here to prevent warnings about order of initializations
|
||||
//
|
||||
@@ -126,7 +135,6 @@ private:
|
||||
//
|
||||
LedgerIndex minLedger_ = 0;
|
||||
LedgerIndex maxLedger_ = 0;
|
||||
RangeSet<std::uint32_t> shards_;
|
||||
uint256 closedLedgerHash_;
|
||||
uint256 previousLedgerHash_;
|
||||
std::deque<uint256> recentLedgers_;
|
||||
@@ -155,6 +163,9 @@ private:
|
||||
std::unique_ptr <LoadEvent> load_event_;
|
||||
bool hopsAware_ = false;
|
||||
|
||||
std::mutex mutable shardInfoMutex_;
|
||||
hash_map<PublicKey, ShardInfo> shardInfo_;
|
||||
|
||||
friend class OverlayImpl;
|
||||
|
||||
public:
|
||||
@@ -235,6 +246,7 @@ public:
|
||||
return id_;
|
||||
}
|
||||
|
||||
/** Returns `true` if this connection will publicly share its IP address. */
|
||||
bool
|
||||
crawl() const;
|
||||
|
||||
@@ -301,9 +313,6 @@ public:
|
||||
bool
|
||||
hasShard (std::uint32_t shardIndex) const override;
|
||||
|
||||
std::string
|
||||
getShards () const override;
|
||||
|
||||
bool
|
||||
hasTxSet (uint256 const& hash) const override;
|
||||
|
||||
@@ -326,6 +335,14 @@ public:
|
||||
void
|
||||
fail(std::string const& reason);
|
||||
|
||||
/** Return a range set of known shard indexes from this peer. */
|
||||
boost::optional<RangeSet<std::uint32_t>>
|
||||
getShardIndexes() const;
|
||||
|
||||
/** Return any known shard info from this peer and its sub peers. */
|
||||
boost::optional<hash_map<PublicKey, ShardInfo>>
|
||||
getPeerShardInfo() const;
|
||||
|
||||
private:
|
||||
void
|
||||
close();
|
||||
@@ -412,6 +429,8 @@ public:
|
||||
void onMessage (std::shared_ptr <protocol::TMManifests> const& m);
|
||||
void onMessage (std::shared_ptr <protocol::TMPing> const& m);
|
||||
void onMessage (std::shared_ptr <protocol::TMCluster> const& m);
|
||||
void onMessage (std::shared_ptr <protocol::TMGetShardInfo> const& m);
|
||||
void onMessage (std::shared_ptr <protocol::TMShardInfo> const& m);
|
||||
void onMessage (std::shared_ptr <protocol::TMGetPeers> const& m);
|
||||
void onMessage (std::shared_ptr <protocol::TMPeers> const& m);
|
||||
void onMessage (std::shared_ptr <protocol::TMEndpoints> const& m);
|
||||
|
||||
@@ -46,6 +46,8 @@ protocolMessageName (int type)
|
||||
case protocol::mtPING: return "ping";
|
||||
case protocol::mtPROOFOFWORK: return "proof_of_work";
|
||||
case protocol::mtCLUSTER: return "cluster";
|
||||
case protocol::mtGET_SHARD_INFO: return "get_shard_info";
|
||||
case protocol::mtSHARD_INFO: return "shard_info";
|
||||
case protocol::mtGET_PEERS: return "get_peers";
|
||||
case protocol::mtPEERS: return "peers";
|
||||
case protocol::mtENDPOINTS: return "endpoints";
|
||||
@@ -117,6 +119,8 @@ invokeProtocolMessage (Buffers const& buffers, Handler& handler)
|
||||
case protocol::mtMANIFESTS: ec = detail::invoke<protocol::TMManifests> (type, buffers, handler); break;
|
||||
case protocol::mtPING: ec = detail::invoke<protocol::TMPing> (type, buffers, handler); break;
|
||||
case protocol::mtCLUSTER: ec = detail::invoke<protocol::TMCluster> (type, buffers, handler); break;
|
||||
case protocol::mtGET_SHARD_INFO:ec = detail::invoke<protocol::TMGetShardInfo> (type, buffers, handler); break;
|
||||
case protocol::mtSHARD_INFO: ec = detail::invoke<protocol::TMShardInfo>(type, buffers, handler); break;
|
||||
case protocol::mtGET_PEERS: ec = detail::invoke<protocol::TMGetPeers> (type, buffers, handler); break;
|
||||
case protocol::mtPEERS: ec = detail::invoke<protocol::TMPeers> (type, buffers, handler); break;
|
||||
case protocol::mtENDPOINTS: ec = detail::invoke<protocol::TMEndpoints> (type, buffers, handler); break;
|
||||
|
||||
@@ -64,6 +64,8 @@ TrafficCount::category TrafficCount::categorize (
|
||||
|
||||
if ((type == protocol::mtMANIFESTS) ||
|
||||
(type == protocol::mtENDPOINTS) ||
|
||||
(type == protocol::mtGET_SHARD_INFO) ||
|
||||
(type == protocol::mtSHARD_INFO) ||
|
||||
(type == protocol::mtPEERS) ||
|
||||
(type == protocol::mtGET_PEERS))
|
||||
return TrafficCount::category::CT_overlay;
|
||||
|
||||
@@ -8,6 +8,8 @@ enum MessageType
|
||||
mtPING = 3;
|
||||
mtPROOFOFWORK = 4;
|
||||
mtCLUSTER = 5;
|
||||
mtGET_SHARD_INFO = 10;
|
||||
mtSHARD_INFO = 11;
|
||||
mtGET_PEERS = 12;
|
||||
mtPEERS = 13;
|
||||
mtENDPOINTS = 15;
|
||||
@@ -20,8 +22,6 @@ enum MessageType
|
||||
mtVALIDATION = 41;
|
||||
mtGET_OBJECTS = 42;
|
||||
|
||||
// <available> = 10;
|
||||
// <available> = 11;
|
||||
// <available> = 14;
|
||||
// <available> = 20;
|
||||
// <available> = 21;
|
||||
@@ -127,6 +127,23 @@ message TMCluster
|
||||
repeated TMLoadSource loadSources = 2;
|
||||
}
|
||||
|
||||
// Request info on shards held
|
||||
message TMGetShardInfo
|
||||
{
|
||||
required uint32 hops = 1; // number of hops to travel
|
||||
optional bool lastLink = 2; // true if last link in the peer chain
|
||||
repeated uint32 peerchain = 3;
|
||||
}
|
||||
|
||||
// Info about shards held
|
||||
message TMShardInfo
|
||||
{
|
||||
required bytes nodePubKey = 1;
|
||||
required string shardIndexes = 2; // rangeSet of shard indexes
|
||||
optional string endpoint = 3; // ipv6 or ipv4 address
|
||||
optional bool lastLink = 4; // true if last link in the peer chain
|
||||
repeated uint32 peerchain = 5;
|
||||
}
|
||||
|
||||
// A transaction can have only one input and one output.
|
||||
// If you want to send an amount that is greater than any single address of yours
|
||||
@@ -180,7 +197,6 @@ message TMStatusChange
|
||||
optional uint64 networkTime = 6;
|
||||
optional uint32 firstSeq = 7;
|
||||
optional uint32 lastSeq = 8;
|
||||
optional string shardSeqs = 9;
|
||||
}
|
||||
|
||||
|
||||
|
||||
82
src/ripple/rpc/handlers/CrawlShards.cpp
Normal file
82
src/ripple/rpc/handlers/CrawlShards.cpp
Normal file
@@ -0,0 +1,82 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2018 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <ripple/app/main/Application.h>
|
||||
#include <ripple/net/RPCErr.h>
|
||||
#include <ripple/nodestore/DatabaseShard.h>
|
||||
#include <ripple/overlay/Overlay.h>
|
||||
#include <ripple/protocol/JsonFields.h>
|
||||
#include <ripple/resource/Fees.h>
|
||||
#include <ripple/rpc/Context.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
static std::uint32_t constexpr hopLimit = 3;
|
||||
|
||||
/** RPC command that reports stored shards by nodes.
|
||||
{
|
||||
// Determines if the result includes node public key.
|
||||
// optional, default is false
|
||||
pubkey: <bool>
|
||||
|
||||
// The maximum number of peer hops to attempt.
|
||||
// optional, default is zero, maximum is 3
|
||||
limit: <integer>
|
||||
}
|
||||
*/
|
||||
Json::Value
|
||||
doCrawlShards(RPC::Context& context)
|
||||
{
|
||||
std::uint32_t hops {0};
|
||||
if (auto const& jv = context.params[jss::limit])
|
||||
{
|
||||
if (!(jv.isUInt() || (jv.isInt() && jv.asInt() >= 0)))
|
||||
{
|
||||
return RPC::expected_field_error(
|
||||
jss::limit, "unsigned integer");
|
||||
}
|
||||
|
||||
hops = std::min(jv.asUInt(), hopLimit);
|
||||
}
|
||||
|
||||
bool const pubKey {context.params.isMember(jss::public_key) &&
|
||||
context.params[jss::public_key].asBool()};
|
||||
|
||||
// Collect shard info from peers connected to this server
|
||||
Json::Value jvResult {context.app.overlay().crawlShards(pubKey, hops)};
|
||||
|
||||
// Collect shard info from this server
|
||||
if (auto shardStore = context.app.getShardStore())
|
||||
{
|
||||
if (pubKey)
|
||||
jvResult[jss::public_key] = toBase58(
|
||||
TokenType::NodePublic, context.app.nodeIdentity().first);
|
||||
jvResult[jss::complete_shards] = std::move(
|
||||
shardStore->getCompleteShards());
|
||||
}
|
||||
|
||||
if (hops == 0)
|
||||
context.loadType = Resource::feeMediumBurdenRPC;
|
||||
else
|
||||
context.loadType = Resource::feeHighBurdenRPC;
|
||||
|
||||
return jvResult;
|
||||
}
|
||||
|
||||
} // ripple
|
||||
@@ -69,6 +69,7 @@ Json::Value doServerInfo (RPC::Context&); // for humans
|
||||
Json::Value doServerState (RPC::Context&); // for machines
|
||||
Json::Value doSign (RPC::Context&);
|
||||
Json::Value doSignFor (RPC::Context&);
|
||||
Json::Value doCrawlShards (RPC::Context&);
|
||||
Json::Value doStop (RPC::Context&);
|
||||
Json::Value doSubmit (RPC::Context&);
|
||||
Json::Value doSubmitMultiSigned (RPC::Context&);
|
||||
|
||||
@@ -104,6 +104,7 @@ Handler const handlerArray[] {
|
||||
{ "submit_multisigned", byRef (&doSubmitMultiSigned), Role::USER, NEEDS_CURRENT_LEDGER },
|
||||
{ "server_info", byRef (&doServerInfo), Role::USER, NO_CONDITION },
|
||||
{ "server_state", byRef (&doServerState), Role::USER, NO_CONDITION },
|
||||
{ "crawl_shards", byRef (&doCrawlShards), Role::USER, NO_CONDITION },
|
||||
{ "stop", byRef (&doStop), Role::ADMIN, NO_CONDITION },
|
||||
{ "transaction_entry", byRef (&doTransactionEntry), Role::USER, NO_CONDITION },
|
||||
{ "tx", byRef (&doTx), Role::USER, NEEDS_NETWORK_CONNECTION },
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
#include <ripple/rpc/handlers/ServerState.cpp>
|
||||
#include <ripple/rpc/handlers/SignFor.cpp>
|
||||
#include <ripple/rpc/handlers/SignHandler.cpp>
|
||||
#include <ripple/rpc/handlers/CrawlShards.cpp>
|
||||
#include <ripple/rpc/handlers/Stop.cpp>
|
||||
#include <ripple/rpc/handlers/Submit.cpp>
|
||||
#include <ripple/rpc/handlers/SubmitMultiSigned.cpp>
|
||||
|
||||
Reference in New Issue
Block a user