Use public key when routing shard crawl requests

This commit is contained in:
Miguel Portilla
2019-04-10 18:11:22 -04:00
committed by Nik Bougalis
parent 0329ee236f
commit 2e26377e7c
9 changed files with 170 additions and 92 deletions

View File

@@ -517,12 +517,12 @@ DatabaseShardImp::setStored(std::shared_ptr<Ledger const> const& ledger)
updateStats(l);
// Update peers with new shard index
protocol::TMShardInfo message;
protocol::TMPeerShardInfo message;
PublicKey const& publicKey {app_.nodeIdentity().first};
message.set_nodepubkey(publicKey.data(), publicKey.size());
message.set_shardindexes(std::to_string(shardIndex));
app_.overlay().foreach(send_always(
std::make_shared<Message>(message, protocol::mtSHARD_INFO)));
std::make_shared<Message>(message, protocol::mtPEER_SHARD_INFO)));
}
}

View File

@@ -137,6 +137,11 @@ public:
std::shared_ptr<Peer>
findPeerByShortID (Peer::id_t const& id) = 0;
/** Returns the peer with the matching public key, or null. */
virtual
std::shared_ptr<Peer>
findPeerByPublicKey (PublicKey const& pubKey) = 0;
/** Broadcast a proposal. */
virtual
void

View File

@@ -770,10 +770,10 @@ OverlayImpl::crawlShards(bool pubKey, std::uint32_t hops)
}
// Relay request to active peers
protocol::TMGetShardInfo tmGS;
tmGS.set_hops(hops);
protocol::TMGetPeerShardInfo tmGPS;
tmGPS.set_hops(hops);
foreach(send_always(std::make_shared<Message>(
tmGS, protocol::mtGET_SHARD_INFO)));
tmGPS, protocol::mtGET_PEER_SHARD_INFO)));
if (csCV_.wait_for(l, timeout) == std::cv_status::timeout)
{
@@ -1083,6 +1083,23 @@ OverlayImpl::findPeerByShortID (Peer::id_t const& id)
return {};
}
// A public key hash map was not used due to the peer connect/disconnect
// update overhead outweighing the performance of a small set linear search.
std::shared_ptr<Peer>
OverlayImpl::findPeerByPublicKey (PublicKey const& pubKey)
{
std::lock_guard <decltype(mutex_)> lock(mutex_);
for (auto const& e : ids_)
{
if (auto peer = e.second.lock())
{
if (peer->getNodePublic() == pubKey)
return peer;
}
}
return {};
}
void
OverlayImpl::send (protocol::TMProposeSet& m)
{

View File

@@ -196,6 +196,9 @@ public:
std::shared_ptr<Peer>
findPeerByShortID (Peer::id_t const& id) override;
std::shared_ptr<Peer>
findPeerByPublicKey (PublicKey const& pubKey) override;
void
send (protocol::TMProposeSet& m) override;

View File

@@ -148,9 +148,9 @@ PeerImp::run()
}
// Request shard info from peer
protocol::TMGetShardInfo tmGS;
tmGS.set_hops(0);
send(std::make_shared<Message>(tmGS, protocol::mtGET_SHARD_INFO));
protocol::TMGetPeerShardInfo tmGPS;
tmGPS.set_hops(0);
send(std::make_shared<Message>(tmGPS, protocol::mtGET_PEER_SHARD_INFO));
setTimer();
}
@@ -1019,17 +1019,29 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMCluster> const& m)
}
void
PeerImp::onMessage (std::shared_ptr <protocol::TMGetShardInfo> const& m)
PeerImp::onMessage(std::shared_ptr <protocol::TMGetShardInfo> const& m)
{
if (m->hops() > csHopLimit || m->peerchain_size() > csHopLimit)
{
fee_ = Resource::feeInvalidRequest;
JLOG(p_journal_.warn()) <<
(m->hops() > csHopLimit ?
"Hops (" + std::to_string(m->hops()) + ") exceed limit" :
"Invalid Peerchain");
return;
}
// DEPRECATED
}
void
PeerImp::onMessage(std::shared_ptr <protocol::TMShardInfo> const& m)
{
// DEPRECATED
}
void
PeerImp::onMessage (std::shared_ptr <protocol::TMGetPeerShardInfo> const& m)
{
auto badData = [&](std::string msg) {
fee_ = Resource::feeBadData;
JLOG(p_journal_.warn()) << msg;
};
if (m->hops() > csHopLimit)
return badData("Invalid hops: " + std::to_string(m->hops()));
if (m->peerchain_size() > csHopLimit)
return badData("Invalid peer chain");
// Reply with shard info we may have
if (auto shardStore = app_.getShardStore())
@@ -1038,7 +1050,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetShardInfo> const& m)
auto shards {shardStore->getCompleteShards()};
if (!shards.empty())
{
protocol::TMShardInfo reply;
protocol::TMPeerShardInfo reply;
reply.set_shardindexes(shards);
if (m->has_lastlink())
@@ -1047,7 +1059,8 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetShardInfo> const& m)
if (m->peerchain_size() > 0)
*reply.mutable_peerchain() = m->peerchain();
send(std::make_shared<Message>(reply, protocol::mtSHARD_INFO));
send(std::make_shared<Message>(
reply, protocol::mtPEER_SHARD_INFO));
JLOG(p_journal_.trace()) <<
"Sent shard indexes " << shards;
@@ -1063,31 +1076,41 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetShardInfo> const& m)
if (m->hops() == 0)
m->set_lastlink(true);
m->add_peerchain(id());
m->add_peerchain()->set_nodepubkey(
publicKey_.data(), publicKey_.size());
overlay_.foreach(send_if_not(
std::make_shared<Message>(*m, protocol::mtGET_SHARD_INFO),
std::make_shared<Message>(*m, protocol::mtGET_PEER_SHARD_INFO),
match_peer(this)));
}
}
void
PeerImp::onMessage(std::shared_ptr <protocol::TMShardInfo> const& m)
PeerImp::onMessage(std::shared_ptr <protocol::TMPeerShardInfo> const& m)
{
if (m->shardindexes().empty() || m->peerchain_size() > csHopLimit)
{
auto badData = [&](std::string msg) {
fee_ = Resource::feeBadData;
JLOG(p_journal_.warn()) <<
(m->shardindexes().empty() ?
"Missing shard indexes" :
"Invalid Peerchain");
return;
}
JLOG(p_journal_.warn()) << msg;
};
if (m->shardindexes().empty())
return badData("Missing shard indexes");
if (m->peerchain_size() > csHopLimit)
return badData("Invalid peer chain");
if (m->has_nodepubkey() && !publicKeyType(makeSlice(m->nodepubkey())))
return badData("Invalid public key");
// Check if the message should be forwarded to another peer
if (m->peerchain_size() > 0)
{
auto const peerId {m->peerchain(m->peerchain_size() - 1)};
if (auto peer = overlay_.findPeerByShortID(peerId))
// Get the Public key of the last link in the peer chain
auto const s {makeSlice(m->peerchain(
m->peerchain_size() - 1).nodepubkey())};
if (!publicKeyType(s))
return badData("Invalid pubKey");
PublicKey peerPubKey(s);
if (auto peer = overlay_.findPeerByPublicKey(peerPubKey))
{
if (!m->has_nodepubkey())
m->set_nodepubkey(publicKey_.data(), publicKey_.size());
@@ -1102,10 +1125,11 @@ PeerImp::onMessage(std::shared_ptr <protocol::TMShardInfo> const& m)
}
m->mutable_peerchain()->RemoveLast();
peer->send(std::make_shared<Message>(*m, protocol::mtSHARD_INFO));
peer->send(std::make_shared<Message>(
*m, protocol::mtPEER_SHARD_INFO));
JLOG(p_journal_.trace()) <<
"Relayed TMShardInfo to peer with IP " <<
"Relayed TMPeerShardInfo to peer with IP " <<
remote_address_.address().to_string();
}
else
@@ -1191,19 +1215,11 @@ PeerImp::onMessage(std::shared_ptr <protocol::TMShardInfo> const& m)
break;
}
default:
fee_ = Resource::feeBadData;
return;
return badData("Invalid shard indexes");
}
}
}
// Get the Public key of the node reporting the shard info
PublicKey publicKey;
if (m->has_nodepubkey())
publicKey = PublicKey(makeSlice(m->nodepubkey()));
else
publicKey = publicKey_;
// Get the IP of the node reporting the shard info
beast::IP::Endpoint endpoint;
if (m->has_endpoint())
@@ -1213,18 +1229,21 @@ PeerImp::onMessage(std::shared_ptr <protocol::TMShardInfo> const& m)
auto result {
beast::IP::Endpoint::from_string_checked(m->endpoint())};
if (!result.second)
{
fee_ = Resource::feeBadData;
JLOG(p_journal_.warn()) <<
"failed to parse incoming endpoint: {" <<
m->endpoint() << "}";
return;
}
return badData("Invalid incoming endpoint: " + m->endpoint());
endpoint = std::move(result.first);
}
}
else if (crawl()) // Check if peer will share IP publicly
{
endpoint = remote_address_;
}
// Get the Public key of the node reporting the shard info
PublicKey publicKey;
if (m->has_nodepubkey())
publicKey = PublicKey(makeSlice(m->nodepubkey()));
else
publicKey = publicKey_;
{
std::lock_guard<std::mutex> l {shardInfoMutex_};
@@ -1248,7 +1267,7 @@ PeerImp::onMessage(std::shared_ptr <protocol::TMShardInfo> const& m)
}
JLOG(p_journal_.trace()) <<
"Consumed TMShardInfo originating from public key " <<
"Consumed TMPeerShardInfo originating from public key " <<
toBase58(TokenType::NodePublic, publicKey) <<
" shard indexes " << m->shardindexes();

View File

@@ -430,6 +430,8 @@ public:
void onMessage (std::shared_ptr <protocol::TMCluster> const& m);
void onMessage (std::shared_ptr <protocol::TMGetShardInfo> const& m);
void onMessage (std::shared_ptr <protocol::TMShardInfo> const& m);
void onMessage (std::shared_ptr <protocol::TMGetPeerShardInfo> const& m);
void onMessage (std::shared_ptr <protocol::TMPeerShardInfo> const& m);
void onMessage (std::shared_ptr <protocol::TMGetPeers> const& m);
void onMessage (std::shared_ptr <protocol::TMPeers> const& m);
void onMessage (std::shared_ptr <protocol::TMEndpoints> const& m);

View File

@@ -41,24 +41,26 @@ protocolMessageName (int type)
{
switch (type)
{
case protocol::mtHELLO: return "hello";
case protocol::mtMANIFESTS: return "manifests";
case protocol::mtPING: return "ping";
case protocol::mtPROOFOFWORK: return "proof_of_work";
case protocol::mtCLUSTER: return "cluster";
case protocol::mtGET_SHARD_INFO: return "get_shard_info";
case protocol::mtSHARD_INFO: return "shard_info";
case protocol::mtGET_PEERS: return "get_peers";
case protocol::mtPEERS: return "peers";
case protocol::mtENDPOINTS: return "endpoints";
case protocol::mtTRANSACTION: return "tx";
case protocol::mtGET_LEDGER: return "get_ledger";
case protocol::mtLEDGER_DATA: return "ledger_data";
case protocol::mtPROPOSE_LEDGER: return "propose";
case protocol::mtSTATUS_CHANGE: return "status";
case protocol::mtHAVE_SET: return "have_set";
case protocol::mtVALIDATION: return "validation";
case protocol::mtGET_OBJECTS: return "get_objects";
case protocol::mtHELLO: return "hello";
case protocol::mtMANIFESTS: return "manifests";
case protocol::mtPING: return "ping";
case protocol::mtPROOFOFWORK: return "proof_of_work";
case protocol::mtCLUSTER: return "cluster";
case protocol::mtGET_SHARD_INFO: return "get_shard_info";
case protocol::mtSHARD_INFO: return "shard_info";
case protocol::mtGET_PEER_SHARD_INFO: return "get_peer_shard_info";
case protocol::mtPEER_SHARD_INFO: return "peer_shard_info";
case protocol::mtGET_PEERS: return "get_peers";
case protocol::mtPEERS: return "peers";
case protocol::mtENDPOINTS: return "endpoints";
case protocol::mtTRANSACTION: return "tx";
case protocol::mtGET_LEDGER: return "get_ledger";
case protocol::mtLEDGER_DATA: return "ledger_data";
case protocol::mtPROPOSE_LEDGER: return "propose";
case protocol::mtSTATUS_CHANGE: return "status";
case protocol::mtHAVE_SET: return "have_set";
case protocol::mtVALIDATION: return "validation";
case protocol::mtGET_OBJECTS: return "get_objects";
default:
break;
};
@@ -128,23 +130,25 @@ invokeProtocolMessage (Buffers const& buffers, Handler& handler)
switch (type)
{
case protocol::mtHELLO: ec = detail::invoke<protocol::TMHello> (type, buffers, handler); break;
case protocol::mtMANIFESTS: ec = detail::invoke<protocol::TMManifests> (type, buffers, handler); break;
case protocol::mtPING: ec = detail::invoke<protocol::TMPing> (type, buffers, handler); break;
case protocol::mtCLUSTER: ec = detail::invoke<protocol::TMCluster> (type, buffers, handler); break;
case protocol::mtGET_SHARD_INFO:ec = detail::invoke<protocol::TMGetShardInfo> (type, buffers, handler); break;
case protocol::mtSHARD_INFO: ec = detail::invoke<protocol::TMShardInfo>(type, buffers, handler); break;
case protocol::mtGET_PEERS: ec = detail::invoke<protocol::TMGetPeers> (type, buffers, handler); break;
case protocol::mtPEERS: ec = detail::invoke<protocol::TMPeers> (type, buffers, handler); break;
case protocol::mtENDPOINTS: ec = detail::invoke<protocol::TMEndpoints> (type, buffers, handler); break;
case protocol::mtTRANSACTION: ec = detail::invoke<protocol::TMTransaction> (type, buffers, handler); break;
case protocol::mtGET_LEDGER: ec = detail::invoke<protocol::TMGetLedger> (type, buffers, handler); break;
case protocol::mtLEDGER_DATA: ec = detail::invoke<protocol::TMLedgerData> (type, buffers, handler); break;
case protocol::mtPROPOSE_LEDGER:ec = detail::invoke<protocol::TMProposeSet> (type, buffers, handler); break;
case protocol::mtSTATUS_CHANGE: ec = detail::invoke<protocol::TMStatusChange> (type, buffers, handler); break;
case protocol::mtHAVE_SET: ec = detail::invoke<protocol::TMHaveTransactionSet> (type, buffers, handler); break;
case protocol::mtVALIDATION: ec = detail::invoke<protocol::TMValidation> (type, buffers, handler); break;
case protocol::mtGET_OBJECTS: ec = detail::invoke<protocol::TMGetObjectByHash> (type, buffers, handler); break;
case protocol::mtHELLO: ec = detail::invoke<protocol::TMHello> (type, buffers, handler); break;
case protocol::mtMANIFESTS: ec = detail::invoke<protocol::TMManifests> (type, buffers, handler); break;
case protocol::mtPING: ec = detail::invoke<protocol::TMPing> (type, buffers, handler); break;
case protocol::mtCLUSTER: ec = detail::invoke<protocol::TMCluster> (type, buffers, handler); break;
case protocol::mtGET_SHARD_INFO: ec = detail::invoke<protocol::TMGetShardInfo> (type, buffers, handler); break;
case protocol::mtSHARD_INFO: ec = detail::invoke<protocol::TMShardInfo>(type, buffers, handler); break;
case protocol::mtGET_PEER_SHARD_INFO: ec = detail::invoke<protocol::TMGetPeerShardInfo> (type, buffers, handler); break;
case protocol::mtPEER_SHARD_INFO: ec = detail::invoke<protocol::TMPeerShardInfo>(type, buffers, handler); break;
case protocol::mtGET_PEERS: ec = detail::invoke<protocol::TMGetPeers> (type, buffers, handler); break;
case protocol::mtPEERS: ec = detail::invoke<protocol::TMPeers> (type, buffers, handler); break;
case protocol::mtENDPOINTS: ec = detail::invoke<protocol::TMEndpoints> (type, buffers, handler); break;
case protocol::mtTRANSACTION: ec = detail::invoke<protocol::TMTransaction> (type, buffers, handler); break;
case protocol::mtGET_LEDGER: ec = detail::invoke<protocol::TMGetLedger> (type, buffers, handler); break;
case protocol::mtLEDGER_DATA: ec = detail::invoke<protocol::TMLedgerData> (type, buffers, handler); break;
case protocol::mtPROPOSE_LEDGER: ec = detail::invoke<protocol::TMProposeSet> (type, buffers, handler); break;
case protocol::mtSTATUS_CHANGE: ec = detail::invoke<protocol::TMStatusChange> (type, buffers, handler); break;
case protocol::mtHAVE_SET: ec = detail::invoke<protocol::TMHaveTransactionSet> (type, buffers, handler); break;
case protocol::mtVALIDATION: ec = detail::invoke<protocol::TMValidation> (type, buffers, handler); break;
case protocol::mtGET_OBJECTS: ec = detail::invoke<protocol::TMGetObjectByHash> (type, buffers, handler); break;
default:
ec = handler.onMessageUnknown (type);
break;

View File

@@ -66,6 +66,8 @@ TrafficCount::category TrafficCount::categorize (
(type == protocol::mtENDPOINTS) ||
(type == protocol::mtGET_SHARD_INFO) ||
(type == protocol::mtSHARD_INFO) ||
(type == protocol::mtGET_PEER_SHARD_INFO) ||
(type == protocol::mtPEER_SHARD_INFO) ||
(type == protocol::mtPEERS) ||
(type == protocol::mtGET_PEERS))
return TrafficCount::category::CT_overlay;

View File

@@ -21,6 +21,8 @@ enum MessageType
mtGET_OBJECTS = 42;
mtGET_SHARD_INFO = 50;
mtSHARD_INFO = 51;
mtGET_PEER_SHARD_INFO = 52;
mtPEER_SHARD_INFO = 53;
// <available> = 10;
// <available> = 11;
@@ -132,19 +134,43 @@ message TMCluster
// Request info on shards held
message TMGetShardInfo
{
required uint32 hops = 1; // number of hops to travel
optional bool lastLink = 2; // true if last link in the peer chain
repeated uint32 peerchain = 3;
required uint32 hops = 1 [deprecated=true]; // number of hops to travel
optional bool lastLink = 2 [deprecated=true]; // true if last link in the peer chain
repeated uint32 peerchain = 3 [deprecated=true]; // IDs used to route messages
}
// Info about shards held
message TMShardInfo
{
required string shardIndexes = 1 [deprecated=true]; // rangeSet of shard indexes
optional bytes nodePubKey = 2 [deprecated=true]; // The node's public key
optional string endpoint = 3 [deprecated=true]; // ipv6 or ipv4 address
optional bool lastLink = 4 [deprecated=true]; // true if last link in the peer chain
repeated uint32 peerchain = 5 [deprecated=true]; // IDs used to route messages
}
// Node public key
message TMLink
{
required bytes nodePubKey = 1; // node public key
}
// Request info on shards held
message TMGetPeerShardInfo
{
required uint32 hops = 1; // number of hops to travel
optional bool lastLink = 2; // true if last link in the peer chain
repeated TMLink peerChain = 3; // public keys used to route messages
}
// Info about shards held
message TMPeerShardInfo
{
required string shardIndexes = 1; // rangeSet of shard indexes
optional bytes nodePubKey = 2; // The node's public key
optional bytes nodePubKey = 2; // node public key
optional string endpoint = 3; // ipv6 or ipv4 address
optional bool lastLink = 4; // true if last link in the peer chain
repeated uint32 peerchain = 5; // List of IDs used to route messages
repeated TMLink peerChain = 5; // public keys used to route messages
}
// A transaction can have only one input and one output.