diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 3667b7cdf8..3939dae5bb 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -517,12 +517,12 @@ DatabaseShardImp::setStored(std::shared_ptr const& ledger) updateStats(l); // Update peers with new shard index - protocol::TMShardInfo message; + protocol::TMPeerShardInfo message; PublicKey const& publicKey {app_.nodeIdentity().first}; message.set_nodepubkey(publicKey.data(), publicKey.size()); message.set_shardindexes(std::to_string(shardIndex)); app_.overlay().foreach(send_always( - std::make_shared(message, protocol::mtSHARD_INFO))); + std::make_shared(message, protocol::mtPEER_SHARD_INFO))); } } diff --git a/src/ripple/overlay/Overlay.h b/src/ripple/overlay/Overlay.h index abdaced77c..af7821363f 100644 --- a/src/ripple/overlay/Overlay.h +++ b/src/ripple/overlay/Overlay.h @@ -137,6 +137,11 @@ public: std::shared_ptr findPeerByShortID (Peer::id_t const& id) = 0; + /** Returns the peer with the matching public key, or null. */ + virtual + std::shared_ptr + findPeerByPublicKey (PublicKey const& pubKey) = 0; + /** Broadcast a proposal. */ virtual void diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index f948cc3de7..148e03f693 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -770,10 +770,10 @@ OverlayImpl::crawlShards(bool pubKey, std::uint32_t hops) } // Relay request to active peers - protocol::TMGetShardInfo tmGS; - tmGS.set_hops(hops); + protocol::TMGetPeerShardInfo tmGPS; + tmGPS.set_hops(hops); foreach(send_always(std::make_shared( - tmGS, protocol::mtGET_SHARD_INFO))); + tmGPS, protocol::mtGET_PEER_SHARD_INFO))); if (csCV_.wait_for(l, timeout) == std::cv_status::timeout) { @@ -1083,6 +1083,23 @@ OverlayImpl::findPeerByShortID (Peer::id_t const& id) return {}; } +// A public key hash map was not used due to the peer connect/disconnect +// update overhead outweighing the performance of a small set linear search. +std::shared_ptr +OverlayImpl::findPeerByPublicKey (PublicKey const& pubKey) +{ + std::lock_guard lock(mutex_); + for (auto const& e : ids_) + { + if (auto peer = e.second.lock()) + { + if (peer->getNodePublic() == pubKey) + return peer; + } + } + return {}; +} + void OverlayImpl::send (protocol::TMProposeSet& m) { diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 4c81807f9a..8a187ff64a 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -196,6 +196,9 @@ public: std::shared_ptr findPeerByShortID (Peer::id_t const& id) override; + std::shared_ptr + findPeerByPublicKey (PublicKey const& pubKey) override; + void send (protocol::TMProposeSet& m) override; diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 3dbc6e289c..f0ab94a4f2 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -148,9 +148,9 @@ PeerImp::run() } // Request shard info from peer - protocol::TMGetShardInfo tmGS; - tmGS.set_hops(0); - send(std::make_shared(tmGS, protocol::mtGET_SHARD_INFO)); + protocol::TMGetPeerShardInfo tmGPS; + tmGPS.set_hops(0); + send(std::make_shared(tmGPS, protocol::mtGET_PEER_SHARD_INFO)); setTimer(); } @@ -1019,17 +1019,29 @@ PeerImp::onMessage (std::shared_ptr const& m) } void -PeerImp::onMessage (std::shared_ptr const& m) +PeerImp::onMessage(std::shared_ptr const& m) { - if (m->hops() > csHopLimit || m->peerchain_size() > csHopLimit) - { - fee_ = Resource::feeInvalidRequest; - JLOG(p_journal_.warn()) << - (m->hops() > csHopLimit ? - "Hops (" + std::to_string(m->hops()) + ") exceed limit" : - "Invalid Peerchain"); - return; - } + // DEPRECATED +} + +void +PeerImp::onMessage(std::shared_ptr const& m) +{ + // DEPRECATED +} + +void +PeerImp::onMessage (std::shared_ptr const& m) +{ + auto badData = [&](std::string msg) { + fee_ = Resource::feeBadData; + JLOG(p_journal_.warn()) << msg; + }; + + if (m->hops() > csHopLimit) + return badData("Invalid hops: " + std::to_string(m->hops())); + if (m->peerchain_size() > csHopLimit) + return badData("Invalid peer chain"); // Reply with shard info we may have if (auto shardStore = app_.getShardStore()) @@ -1038,7 +1050,7 @@ PeerImp::onMessage (std::shared_ptr const& m) auto shards {shardStore->getCompleteShards()}; if (!shards.empty()) { - protocol::TMShardInfo reply; + protocol::TMPeerShardInfo reply; reply.set_shardindexes(shards); if (m->has_lastlink()) @@ -1047,7 +1059,8 @@ PeerImp::onMessage (std::shared_ptr const& m) if (m->peerchain_size() > 0) *reply.mutable_peerchain() = m->peerchain(); - send(std::make_shared(reply, protocol::mtSHARD_INFO)); + send(std::make_shared( + reply, protocol::mtPEER_SHARD_INFO)); JLOG(p_journal_.trace()) << "Sent shard indexes " << shards; @@ -1063,31 +1076,41 @@ PeerImp::onMessage (std::shared_ptr const& m) if (m->hops() == 0) m->set_lastlink(true); - m->add_peerchain(id()); + m->add_peerchain()->set_nodepubkey( + publicKey_.data(), publicKey_.size()); + overlay_.foreach(send_if_not( - std::make_shared(*m, protocol::mtGET_SHARD_INFO), + std::make_shared(*m, protocol::mtGET_PEER_SHARD_INFO), match_peer(this))); } } void -PeerImp::onMessage(std::shared_ptr const& m) +PeerImp::onMessage(std::shared_ptr const& m) { - if (m->shardindexes().empty() || m->peerchain_size() > csHopLimit) - { + auto badData = [&](std::string msg) { fee_ = Resource::feeBadData; - JLOG(p_journal_.warn()) << - (m->shardindexes().empty() ? - "Missing shard indexes" : - "Invalid Peerchain"); - return; - } + JLOG(p_journal_.warn()) << msg; + }; + + if (m->shardindexes().empty()) + return badData("Missing shard indexes"); + if (m->peerchain_size() > csHopLimit) + return badData("Invalid peer chain"); + if (m->has_nodepubkey() && !publicKeyType(makeSlice(m->nodepubkey()))) + return badData("Invalid public key"); // Check if the message should be forwarded to another peer if (m->peerchain_size() > 0) { - auto const peerId {m->peerchain(m->peerchain_size() - 1)}; - if (auto peer = overlay_.findPeerByShortID(peerId)) + // Get the Public key of the last link in the peer chain + auto const s {makeSlice(m->peerchain( + m->peerchain_size() - 1).nodepubkey())}; + if (!publicKeyType(s)) + return badData("Invalid pubKey"); + PublicKey peerPubKey(s); + + if (auto peer = overlay_.findPeerByPublicKey(peerPubKey)) { if (!m->has_nodepubkey()) m->set_nodepubkey(publicKey_.data(), publicKey_.size()); @@ -1102,10 +1125,11 @@ PeerImp::onMessage(std::shared_ptr const& m) } m->mutable_peerchain()->RemoveLast(); - peer->send(std::make_shared(*m, protocol::mtSHARD_INFO)); + peer->send(std::make_shared( + *m, protocol::mtPEER_SHARD_INFO)); JLOG(p_journal_.trace()) << - "Relayed TMShardInfo to peer with IP " << + "Relayed TMPeerShardInfo to peer with IP " << remote_address_.address().to_string(); } else @@ -1191,19 +1215,11 @@ PeerImp::onMessage(std::shared_ptr const& m) break; } default: - fee_ = Resource::feeBadData; - return; + return badData("Invalid shard indexes"); } } } - // Get the Public key of the node reporting the shard info - PublicKey publicKey; - if (m->has_nodepubkey()) - publicKey = PublicKey(makeSlice(m->nodepubkey())); - else - publicKey = publicKey_; - // Get the IP of the node reporting the shard info beast::IP::Endpoint endpoint; if (m->has_endpoint()) @@ -1213,18 +1229,21 @@ PeerImp::onMessage(std::shared_ptr const& m) auto result { beast::IP::Endpoint::from_string_checked(m->endpoint())}; if (!result.second) - { - fee_ = Resource::feeBadData; - JLOG(p_journal_.warn()) << - "failed to parse incoming endpoint: {" << - m->endpoint() << "}"; - return; - } + return badData("Invalid incoming endpoint: " + m->endpoint()); endpoint = std::move(result.first); } } else if (crawl()) // Check if peer will share IP publicly + { endpoint = remote_address_; + } + + // Get the Public key of the node reporting the shard info + PublicKey publicKey; + if (m->has_nodepubkey()) + publicKey = PublicKey(makeSlice(m->nodepubkey())); + else + publicKey = publicKey_; { std::lock_guard l {shardInfoMutex_}; @@ -1248,7 +1267,7 @@ PeerImp::onMessage(std::shared_ptr const& m) } JLOG(p_journal_.trace()) << - "Consumed TMShardInfo originating from public key " << + "Consumed TMPeerShardInfo originating from public key " << toBase58(TokenType::NodePublic, publicKey) << " shard indexes " << m->shardindexes(); diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index c7aa95b40c..b35f784818 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -430,6 +430,8 @@ public: void onMessage (std::shared_ptr const& m); void onMessage (std::shared_ptr const& m); void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); + void onMessage (std::shared_ptr const& m); void onMessage (std::shared_ptr const& m); void onMessage (std::shared_ptr const& m); void onMessage (std::shared_ptr const& m); diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h index 2d61874475..129fb295fa 100644 --- a/src/ripple/overlay/impl/ProtocolMessage.h +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -41,24 +41,26 @@ protocolMessageName (int type) { switch (type) { - case protocol::mtHELLO: return "hello"; - case protocol::mtMANIFESTS: return "manifests"; - case protocol::mtPING: return "ping"; - case protocol::mtPROOFOFWORK: return "proof_of_work"; - case protocol::mtCLUSTER: return "cluster"; - case protocol::mtGET_SHARD_INFO: return "get_shard_info"; - case protocol::mtSHARD_INFO: return "shard_info"; - case protocol::mtGET_PEERS: return "get_peers"; - case protocol::mtPEERS: return "peers"; - case protocol::mtENDPOINTS: return "endpoints"; - case protocol::mtTRANSACTION: return "tx"; - case protocol::mtGET_LEDGER: return "get_ledger"; - case protocol::mtLEDGER_DATA: return "ledger_data"; - case protocol::mtPROPOSE_LEDGER: return "propose"; - case protocol::mtSTATUS_CHANGE: return "status"; - case protocol::mtHAVE_SET: return "have_set"; - case protocol::mtVALIDATION: return "validation"; - case protocol::mtGET_OBJECTS: return "get_objects"; + case protocol::mtHELLO: return "hello"; + case protocol::mtMANIFESTS: return "manifests"; + case protocol::mtPING: return "ping"; + case protocol::mtPROOFOFWORK: return "proof_of_work"; + case protocol::mtCLUSTER: return "cluster"; + case protocol::mtGET_SHARD_INFO: return "get_shard_info"; + case protocol::mtSHARD_INFO: return "shard_info"; + case protocol::mtGET_PEER_SHARD_INFO: return "get_peer_shard_info"; + case protocol::mtPEER_SHARD_INFO: return "peer_shard_info"; + case protocol::mtGET_PEERS: return "get_peers"; + case protocol::mtPEERS: return "peers"; + case protocol::mtENDPOINTS: return "endpoints"; + case protocol::mtTRANSACTION: return "tx"; + case protocol::mtGET_LEDGER: return "get_ledger"; + case protocol::mtLEDGER_DATA: return "ledger_data"; + case protocol::mtPROPOSE_LEDGER: return "propose"; + case protocol::mtSTATUS_CHANGE: return "status"; + case protocol::mtHAVE_SET: return "have_set"; + case protocol::mtVALIDATION: return "validation"; + case protocol::mtGET_OBJECTS: return "get_objects"; default: break; }; @@ -128,23 +130,25 @@ invokeProtocolMessage (Buffers const& buffers, Handler& handler) switch (type) { - case protocol::mtHELLO: ec = detail::invoke (type, buffers, handler); break; - case protocol::mtMANIFESTS: ec = detail::invoke (type, buffers, handler); break; - case protocol::mtPING: ec = detail::invoke (type, buffers, handler); break; - case protocol::mtCLUSTER: ec = detail::invoke (type, buffers, handler); break; - case protocol::mtGET_SHARD_INFO:ec = detail::invoke (type, buffers, handler); break; - case protocol::mtSHARD_INFO: ec = detail::invoke(type, buffers, handler); break; - case protocol::mtGET_PEERS: ec = detail::invoke (type, buffers, handler); break; - case protocol::mtPEERS: ec = detail::invoke (type, buffers, handler); break; - case protocol::mtENDPOINTS: ec = detail::invoke (type, buffers, handler); break; - case protocol::mtTRANSACTION: ec = detail::invoke (type, buffers, handler); break; - case protocol::mtGET_LEDGER: ec = detail::invoke (type, buffers, handler); break; - case protocol::mtLEDGER_DATA: ec = detail::invoke (type, buffers, handler); break; - case protocol::mtPROPOSE_LEDGER:ec = detail::invoke (type, buffers, handler); break; - case protocol::mtSTATUS_CHANGE: ec = detail::invoke (type, buffers, handler); break; - case protocol::mtHAVE_SET: ec = detail::invoke (type, buffers, handler); break; - case protocol::mtVALIDATION: ec = detail::invoke (type, buffers, handler); break; - case protocol::mtGET_OBJECTS: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtHELLO: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtMANIFESTS: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtPING: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtCLUSTER: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtGET_SHARD_INFO: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtSHARD_INFO: ec = detail::invoke(type, buffers, handler); break; + case protocol::mtGET_PEER_SHARD_INFO: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtPEER_SHARD_INFO: ec = detail::invoke(type, buffers, handler); break; + case protocol::mtGET_PEERS: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtPEERS: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtENDPOINTS: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtTRANSACTION: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtGET_LEDGER: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtLEDGER_DATA: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtPROPOSE_LEDGER: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtSTATUS_CHANGE: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtHAVE_SET: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtVALIDATION: ec = detail::invoke (type, buffers, handler); break; + case protocol::mtGET_OBJECTS: ec = detail::invoke (type, buffers, handler); break; default: ec = handler.onMessageUnknown (type); break; diff --git a/src/ripple/overlay/impl/TrafficCount.cpp b/src/ripple/overlay/impl/TrafficCount.cpp index 9e9158ffd3..fc160e04e1 100644 --- a/src/ripple/overlay/impl/TrafficCount.cpp +++ b/src/ripple/overlay/impl/TrafficCount.cpp @@ -66,6 +66,8 @@ TrafficCount::category TrafficCount::categorize ( (type == protocol::mtENDPOINTS) || (type == protocol::mtGET_SHARD_INFO) || (type == protocol::mtSHARD_INFO) || + (type == protocol::mtGET_PEER_SHARD_INFO) || + (type == protocol::mtPEER_SHARD_INFO) || (type == protocol::mtPEERS) || (type == protocol::mtGET_PEERS)) return TrafficCount::category::CT_overlay; diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index 27e78da19f..ee4dafe2bf 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -21,6 +21,8 @@ enum MessageType mtGET_OBJECTS = 42; mtGET_SHARD_INFO = 50; mtSHARD_INFO = 51; + mtGET_PEER_SHARD_INFO = 52; + mtPEER_SHARD_INFO = 53; // = 10; // = 11; @@ -132,19 +134,43 @@ message TMCluster // Request info on shards held message TMGetShardInfo { - required uint32 hops = 1; // number of hops to travel - optional bool lastLink = 2; // true if last link in the peer chain - repeated uint32 peerchain = 3; + required uint32 hops = 1 [deprecated=true]; // number of hops to travel + optional bool lastLink = 2 [deprecated=true]; // true if last link in the peer chain + repeated uint32 peerchain = 3 [deprecated=true]; // IDs used to route messages } // Info about shards held message TMShardInfo +{ + required string shardIndexes = 1 [deprecated=true]; // rangeSet of shard indexes + optional bytes nodePubKey = 2 [deprecated=true]; // The node's public key + optional string endpoint = 3 [deprecated=true]; // ipv6 or ipv4 address + optional bool lastLink = 4 [deprecated=true]; // true if last link in the peer chain + repeated uint32 peerchain = 5 [deprecated=true]; // IDs used to route messages +} + +// Node public key +message TMLink +{ + required bytes nodePubKey = 1; // node public key +} + +// Request info on shards held +message TMGetPeerShardInfo +{ + required uint32 hops = 1; // number of hops to travel + optional bool lastLink = 2; // true if last link in the peer chain + repeated TMLink peerChain = 3; // public keys used to route messages +} + +// Info about shards held +message TMPeerShardInfo { required string shardIndexes = 1; // rangeSet of shard indexes - optional bytes nodePubKey = 2; // The node's public key + optional bytes nodePubKey = 2; // node public key optional string endpoint = 3; // ipv6 or ipv4 address optional bool lastLink = 4; // true if last link in the peer chain - repeated uint32 peerchain = 5; // List of IDs used to route messages + repeated TMLink peerChain = 5; // public keys used to route messages } // A transaction can have only one input and one output.