diff --git a/src/ripple/app/consensus/LedgerConsensus.cpp b/src/ripple/app/consensus/LedgerConsensus.cpp index 57d26205c..e99691789 100644 --- a/src/ripple/app/consensus/LedgerConsensus.cpp +++ b/src/ripple/app/consensus/LedgerConsensus.cpp @@ -988,9 +988,7 @@ private: protocol::TMValidation val; val.set_validation (&validation[0], validation.size ()); // Send signed validation to all of our directly connected peers - getApp ().overlay ().foreach (send_always ( - std::make_shared ( - val, protocol::mtVALIDATION))); + getApp().overlay().send(val); WriteLog (lsINFO, LedgerConsensus) << "CNF Val " << newLCLHash; } @@ -1256,9 +1254,7 @@ private: Blob sig = mOurPosition->sign (); prop.set_nodepubkey (&pubKey[0], pubKey.size ()); prop.set_signature (&sig[0], sig.size ()); - getApp ().overlay ().foreach (send_always ( - std::make_shared ( - prop, protocol::mtPROPOSE_LEDGER))); + getApp().overlay().send(prop); } /** Let peers know that we a particular transactions set so they @@ -1681,11 +1677,6 @@ private: Blob validation = v->getSigned (); protocol::TMValidation val; val.set_validation (&validation[0], validation.size ()); - #if 0 - getApp ().overlay ().visit (RelayMessage ( - std::make_shared ( - val, protocol::mtVALIDATION))); - #endif getApp().getOPs ().setLastValidation (v); WriteLog (lsWARNING, LedgerConsensus) << "Sending partial validation"; } diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index f3b6ba49b..254e361e3 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -1710,21 +1710,10 @@ void NetworkOPsImp::processTrustedProposal ( } if (relay) - { - std::set peers; - if (getApp().getHashRouter ().swapSet ( - proposal->getSuppressionID (), peers, SF_RELAYED)) - { - getApp ().overlay ().foreach (send_if_not ( - std::make_shared ( - *set, protocol::mtPROPOSE_LEDGER), - peer_in_set(peers))); - } - } + getApp().overlay().relay(*set, + proposal->getSuppressionID()); else - { m_journal.info << "Not relaying trusted proposal"; - } } } diff --git a/src/ripple/basics/BasicConfig.h b/src/ripple/basics/BasicConfig.h index 703d3c783..aff78bbbe 100644 --- a/src/ripple/basics/BasicConfig.h +++ b/src/ripple/basics/BasicConfig.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -139,6 +140,16 @@ public: std::pair find (std::string const& name) const; + template + boost::optional + get (std::string const& name) const + { + auto const iter = cont().find(name); + if (iter == cont().end()) + return boost::none; + return boost::lexical_cast(iter->second); + } + friend std::ostream& operator<< (std::ostream&, Section const& section); diff --git a/src/ripple/overlay/Overlay.h b/src/ripple/overlay/Overlay.h index f55666fb6..e28c47b6e 100644 --- a/src/ripple/overlay/Overlay.h +++ b/src/ripple/overlay/Overlay.h @@ -31,6 +31,7 @@ #include // #include #include +#include namespace boost { namespace asio { namespace ssl { class context; } } } @@ -65,6 +66,7 @@ public: bool auto_connect = true; Promote promote = Promote::automatic; std::shared_ptr context; + bool expire = false; }; typedef std::vector PeerSequence; @@ -131,6 +133,28 @@ public: Peer::ptr findPeerByShortID (Peer::id_t const& id) = 0; + /** Broadcast a proposal. */ + virtual + void + send (protocol::TMProposeSet& m) = 0; + + /** Broadcast a validation. */ + virtual + void + send (protocol::TMValidation& m) = 0; + + /** Relay a proposal. */ + virtual + void + relay (protocol::TMProposeSet& m, + uint256 const& uid) = 0; + + /** Relay a validation. */ + virtual + void + relay (protocol::TMValidation& m, + uint256 const& uid) = 0; + /** Visit every active peer and return a value The functor must: - Be callable as: diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index d7361cd06..7bcf43708 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -18,6 +18,7 @@ //============================================================================== #include +#include #include #include #include @@ -693,6 +694,75 @@ OverlayImpl::findPeerByShortID (Peer::id_t const& id) return Peer::ptr(); } +void +OverlayImpl::send (protocol::TMProposeSet& m) +{ + if (setup_.expire) + m.set_hops(0); + auto const sm = std::make_shared( + m, protocol::mtPROPOSE_LEDGER); + for_each([&](std::shared_ptr const& p) + { + if (! m.has_hops() || p->hopsAware()) + p->send(sm); + }); +} +void +OverlayImpl::send (protocol::TMValidation& m) +{ + if (setup_.expire) + m.set_hops(0); + auto const sm = std::make_shared( + m, protocol::mtVALIDATION); + for_each([&](std::shared_ptr const& p) + { + if (! m.has_hops() || p->hopsAware()) + p->send(sm); + }); +} + +void +OverlayImpl::relay (protocol::TMProposeSet& m, + uint256 const& uid) +{ + if (m.has_hops() && m.hops() >= maxTTL) + return; + std::set skip; + if (! getApp().getHashRouter().swapSet ( + uid, skip, SF_RELAYED)) + return; + auto const sm = std::make_shared( + m, protocol::mtPROPOSE_LEDGER); + for_each([&](std::shared_ptr const& p) + { + if (skip.find(p->id()) != skip.end()) + return; + if (! m.has_hops() || p->hopsAware()) + p->send(sm); + }); +} + +void +OverlayImpl::relay (protocol::TMValidation& m, + uint256 const& uid) +{ + if (m.has_hops() && m.hops() >= maxTTL) + return; + std::set skip; + if (! getApp().getHashRouter().swapSet ( + uid, skip, SF_RELAYED)) + return; + auto const sm = std::make_shared( + m, protocol::mtVALIDATION); + for_each([&](std::shared_ptr const& p) + { + if (skip.find(p->id()) != skip.end()) + return; + if (! m.has_hops() || p->hopsAware()) + p->send(sm); + }); +} + //------------------------------------------------------------------------------ void @@ -777,6 +847,7 @@ setup_Overlay (BasicConfig const& config) else setup.promote = Overlay::Promote::automatic; setup.context = make_SSLContext(); + setup.expire = get(section, "expire", false); return setup; } diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 46dd0ad37..5b1ce9521 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -46,6 +46,11 @@ namespace ripple { class PeerImp; class BasicConfig; +enum +{ + maxTTL = 2 +}; + class OverlayImpl : public Overlay { public: @@ -174,6 +179,20 @@ public: Peer::ptr findPeerByShortID (Peer::id_t const& id) override; + void + send (protocol::TMProposeSet& m) override; + + void + send (protocol::TMValidation& m) override; + + void + relay (protocol::TMProposeSet& m, + uint256 const& uid) override; + + void + relay (protocol::TMValidation& m, + uint256 const& uid) override; + //-------------------------------------------------------------------------- // // OverlayImpl diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 49639186e..96a3f86c3 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -36,8 +36,10 @@ #include #include #include +#include #include #include +#include #include #include #include // @@ -100,6 +102,20 @@ PeerImp::run() if(! strand_.running_in_this_thread()) return strand_.post(std::bind ( &PeerImp::run, shared_from_this())); + { + auto s = getVersion(); + if (boost::starts_with(s, "rippled-")) + { + s.erase(s.begin(), s.begin() + 8); + beast::SemanticVersion v; + if (v.parse(s)) + { + beast::SemanticVersion av; + av.parse("0.28.0-rc3"); + hopsAware_ = v >= av; + } + } + } if (m_inbound) { doAccept(); @@ -1008,6 +1024,10 @@ PeerImp::onMessage (std::shared_ptr const& m) { protocol::TMProposeSet& set = *m; + if (overlay_.setup().expire && + set.has_hops() && ! slot_->cluster()) + set.set_hops(set.hops() + 1); + // VFALCO Magic numbers are bad if ((set.closetime() + 180) < getApp().getOPs().getCloseTimeNC()) return; @@ -1276,6 +1296,10 @@ PeerImp::onMessage (std::shared_ptr const& m) error_code ec; std::uint32_t closeTime = getApp().getOPs().getCloseTimeNC(); + if (overlay_.setup().expire && + m->has_hops() && ! slot_->cluster()) + m->set_hops(m->hops() + 1); + if (m->validation ().size () < 50) { p_journal_.warning << "Validation: Too small"; @@ -1651,15 +1675,7 @@ PeerImp::checkPropose (Job& job, // relay untrusted proposal p_journal_.trace << "relaying UNTRUSTED proposal"; - std::set peers; - - if (getApp().getHashRouter ().swapSet ( - proposal->getSuppressionID (), peers, SF_RELAYED)) - { - overlay_.foreach (send_if_not ( - std::make_shared (set, protocol::mtPROPOSE_LEDGER), - peer_in_set(peers))); - } + overlay_.relay(set, proposal->getSuppressionID()); } else { @@ -1688,15 +1704,9 @@ PeerImp::checkValidation (Job&, STValidation::pointer val, validatorsConnection_->onValidation(*val); #endif - std::set peers; - if (getApp().getOPs ().recvValidation (val, std::to_string(id())) && - getApp().getHashRouter ().swapSet ( - signingHash, peers, SF_RELAYED)) - { - overlay_.foreach (send_if_not ( - std::make_shared (*packet, protocol::mtVALIDATION), - peer_in_set(peers))); - } + if (getApp().getOPs ().recvValidation( + val, std::to_string(id()))) + overlay_.relay(*packet, signingHash); } catch (...) { diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 632fd000e..4e245bab1 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -153,6 +153,7 @@ private: bool gracefulClose_ = false; std::unique_ptr load_event_; std::unique_ptr validatorsConnection_; + bool hopsAware_ = false; //-------------------------------------------------------------------------- @@ -237,6 +238,12 @@ public: return slot_->cluster(); } + bool + hopsAware() const + { + return hopsAware_; + } + void check(); diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index 69549b8c0..e47ff3057 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -173,6 +173,7 @@ message TMProposeSet optional bool checkedSignature = 7; // node vouches signature is correct repeated bytes addedTransactions = 10; // not required if number is large repeated bytes removedTransactions = 11; // not required if number is large + optional uint32 hops = 12; // Number of hops traveled } enum TxSetStatus @@ -192,8 +193,9 @@ message TMHaveTransactionSet // Used to sign a final closed ledger after reprocessing message TMValidation { - required bytes validation = 1; // in STValidation signed form - optional bool checkedSignature = 2; // node vouches signature is correct + required bytes validation = 1; // in STValidation signed form + optional bool checkedSignature = 2; // node vouches signature is correct + optional uint32 hops = 3; // Number of hops traveled } message TMGetPeers