From 90bb53af20990dc2d4ae767c634785fd1c614345 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Tue, 28 Apr 2015 09:24:01 -0700 Subject: [PATCH] Structured Overlay support for TTL limited messages: When the [overlay] configuration key "expire" is set to 1, proposals and validations will include a hops field. The hops is incremented with each relay. Messages with a hop count will be dropped when they exceed the TTL (Time to Live). Messages containing a hops field will not be relayed or broadcast to older versions of rippled that don't understand the field. This change will not affect normal operation of the network or rippled instances that do not set "expire" to 1. --- src/ripple/app/consensus/LedgerConsensus.cpp | 13 +--- src/ripple/app/misc/NetworkOPs.cpp | 15 +---- src/ripple/basics/BasicConfig.h | 11 +++ src/ripple/overlay/Overlay.h | 24 +++++++ src/ripple/overlay/impl/OverlayImpl.cpp | 71 ++++++++++++++++++++ src/ripple/overlay/impl/OverlayImpl.h | 19 ++++++ src/ripple/overlay/impl/PeerImp.cpp | 46 ++++++++----- src/ripple/overlay/impl/PeerImp.h | 7 ++ src/ripple/proto/ripple.proto | 6 +- 9 files changed, 168 insertions(+), 44 deletions(-) diff --git a/src/ripple/app/consensus/LedgerConsensus.cpp b/src/ripple/app/consensus/LedgerConsensus.cpp index 57d26205c..e99691789 100644 --- a/src/ripple/app/consensus/LedgerConsensus.cpp +++ b/src/ripple/app/consensus/LedgerConsensus.cpp @@ -988,9 +988,7 @@ private: protocol::TMValidation val; val.set_validation (&validation[0], validation.size ()); // Send signed validation to all of our directly connected peers - getApp ().overlay ().foreach (send_always ( - std::make_shared ( - val, protocol::mtVALIDATION))); + getApp().overlay().send(val); WriteLog (lsINFO, LedgerConsensus) << "CNF Val " << newLCLHash; } @@ -1256,9 +1254,7 @@ private: Blob sig = mOurPosition->sign (); prop.set_nodepubkey (&pubKey[0], pubKey.size ()); prop.set_signature (&sig[0], sig.size ()); - getApp ().overlay ().foreach (send_always ( - std::make_shared ( - prop, protocol::mtPROPOSE_LEDGER))); + getApp().overlay().send(prop); } /** Let peers know that we a particular transactions set so they @@ -1681,11 +1677,6 @@ private: Blob validation = v->getSigned (); protocol::TMValidation val; val.set_validation (&validation[0], validation.size ()); - #if 0 - getApp ().overlay ().visit (RelayMessage ( - std::make_shared ( - val, protocol::mtVALIDATION))); - #endif getApp().getOPs ().setLastValidation (v); WriteLog (lsWARNING, LedgerConsensus) << "Sending partial validation"; } diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index f3b6ba49b..254e361e3 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -1710,21 +1710,10 @@ void NetworkOPsImp::processTrustedProposal ( } if (relay) - { - std::set peers; - if (getApp().getHashRouter ().swapSet ( - proposal->getSuppressionID (), peers, SF_RELAYED)) - { - getApp ().overlay ().foreach (send_if_not ( - std::make_shared ( - *set, protocol::mtPROPOSE_LEDGER), - peer_in_set(peers))); - } - } + getApp().overlay().relay(*set, + proposal->getSuppressionID()); else - { m_journal.info << "Not relaying trusted proposal"; - } } } diff --git a/src/ripple/basics/BasicConfig.h b/src/ripple/basics/BasicConfig.h index 703d3c783..aff78bbbe 100644 --- a/src/ripple/basics/BasicConfig.h +++ b/src/ripple/basics/BasicConfig.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -139,6 +140,16 @@ public: std::pair find (std::string const& name) const; + template + boost::optional + get (std::string const& name) const + { + auto const iter = cont().find(name); + if (iter == cont().end()) + return boost::none; + return boost::lexical_cast(iter->second); + } + friend std::ostream& operator<< (std::ostream&, Section const& section); diff --git a/src/ripple/overlay/Overlay.h b/src/ripple/overlay/Overlay.h index f55666fb6..e28c47b6e 100644 --- a/src/ripple/overlay/Overlay.h +++ b/src/ripple/overlay/Overlay.h @@ -31,6 +31,7 @@ #include // #include #include +#include namespace boost { namespace asio { namespace ssl { class context; } } } @@ -65,6 +66,7 @@ public: bool auto_connect = true; Promote promote = Promote::automatic; std::shared_ptr context; + bool expire = false; }; typedef std::vector PeerSequence; @@ -131,6 +133,28 @@ public: Peer::ptr findPeerByShortID (Peer::id_t const& id) = 0; + /** Broadcast a proposal. */ + virtual + void + send (protocol::TMProposeSet& m) = 0; + + /** Broadcast a validation. */ + virtual + void + send (protocol::TMValidation& m) = 0; + + /** Relay a proposal. */ + virtual + void + relay (protocol::TMProposeSet& m, + uint256 const& uid) = 0; + + /** Relay a validation. */ + virtual + void + relay (protocol::TMValidation& m, + uint256 const& uid) = 0; + /** Visit every active peer and return a value The functor must: - Be callable as: diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index d7361cd06..7bcf43708 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -18,6 +18,7 @@ //============================================================================== #include +#include #include #include #include @@ -693,6 +694,75 @@ OverlayImpl::findPeerByShortID (Peer::id_t const& id) return Peer::ptr(); } +void +OverlayImpl::send (protocol::TMProposeSet& m) +{ + if (setup_.expire) + m.set_hops(0); + auto const sm = std::make_shared( + m, protocol::mtPROPOSE_LEDGER); + for_each([&](std::shared_ptr const& p) + { + if (! m.has_hops() || p->hopsAware()) + p->send(sm); + }); +} +void +OverlayImpl::send (protocol::TMValidation& m) +{ + if (setup_.expire) + m.set_hops(0); + auto const sm = std::make_shared( + m, protocol::mtVALIDATION); + for_each([&](std::shared_ptr const& p) + { + if (! m.has_hops() || p->hopsAware()) + p->send(sm); + }); +} + +void +OverlayImpl::relay (protocol::TMProposeSet& m, + uint256 const& uid) +{ + if (m.has_hops() && m.hops() >= maxTTL) + return; + std::set skip; + if (! getApp().getHashRouter().swapSet ( + uid, skip, SF_RELAYED)) + return; + auto const sm = std::make_shared( + m, protocol::mtPROPOSE_LEDGER); + for_each([&](std::shared_ptr const& p) + { + if (skip.find(p->id()) != skip.end()) + return; + if (! m.has_hops() || p->hopsAware()) + p->send(sm); + }); +} + +void +OverlayImpl::relay (protocol::TMValidation& m, + uint256 const& uid) +{ + if (m.has_hops() && m.hops() >= maxTTL) + return; + std::set skip; + if (! getApp().getHashRouter().swapSet ( + uid, skip, SF_RELAYED)) + return; + auto const sm = std::make_shared( + m, protocol::mtVALIDATION); + for_each([&](std::shared_ptr const& p) + { + if (skip.find(p->id()) != skip.end()) + return; + if (! m.has_hops() || p->hopsAware()) + p->send(sm); + }); +} + //------------------------------------------------------------------------------ void @@ -777,6 +847,7 @@ setup_Overlay (BasicConfig const& config) else setup.promote = Overlay::Promote::automatic; setup.context = make_SSLContext(); + setup.expire = get(section, "expire", false); return setup; } diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 46dd0ad37..5b1ce9521 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -46,6 +46,11 @@ namespace ripple { class PeerImp; class BasicConfig; +enum +{ + maxTTL = 2 +}; + class OverlayImpl : public Overlay { public: @@ -174,6 +179,20 @@ public: Peer::ptr findPeerByShortID (Peer::id_t const& id) override; + void + send (protocol::TMProposeSet& m) override; + + void + send (protocol::TMValidation& m) override; + + void + relay (protocol::TMProposeSet& m, + uint256 const& uid) override; + + void + relay (protocol::TMValidation& m, + uint256 const& uid) override; + //-------------------------------------------------------------------------- // // OverlayImpl diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 49639186e..96a3f86c3 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -36,8 +36,10 @@ #include #include #include +#include #include #include +#include #include #include #include // @@ -100,6 +102,20 @@ PeerImp::run() if(! strand_.running_in_this_thread()) return strand_.post(std::bind ( &PeerImp::run, shared_from_this())); + { + auto s = getVersion(); + if (boost::starts_with(s, "rippled-")) + { + s.erase(s.begin(), s.begin() + 8); + beast::SemanticVersion v; + if (v.parse(s)) + { + beast::SemanticVersion av; + av.parse("0.28.0-rc3"); + hopsAware_ = v >= av; + } + } + } if (m_inbound) { doAccept(); @@ -1008,6 +1024,10 @@ PeerImp::onMessage (std::shared_ptr const& m) { protocol::TMProposeSet& set = *m; + if (overlay_.setup().expire && + set.has_hops() && ! slot_->cluster()) + set.set_hops(set.hops() + 1); + // VFALCO Magic numbers are bad if ((set.closetime() + 180) < getApp().getOPs().getCloseTimeNC()) return; @@ -1276,6 +1296,10 @@ PeerImp::onMessage (std::shared_ptr const& m) error_code ec; std::uint32_t closeTime = getApp().getOPs().getCloseTimeNC(); + if (overlay_.setup().expire && + m->has_hops() && ! slot_->cluster()) + m->set_hops(m->hops() + 1); + if (m->validation ().size () < 50) { p_journal_.warning << "Validation: Too small"; @@ -1651,15 +1675,7 @@ PeerImp::checkPropose (Job& job, // relay untrusted proposal p_journal_.trace << "relaying UNTRUSTED proposal"; - std::set peers; - - if (getApp().getHashRouter ().swapSet ( - proposal->getSuppressionID (), peers, SF_RELAYED)) - { - overlay_.foreach (send_if_not ( - std::make_shared (set, protocol::mtPROPOSE_LEDGER), - peer_in_set(peers))); - } + overlay_.relay(set, proposal->getSuppressionID()); } else { @@ -1688,15 +1704,9 @@ PeerImp::checkValidation (Job&, STValidation::pointer val, validatorsConnection_->onValidation(*val); #endif - std::set peers; - if (getApp().getOPs ().recvValidation (val, std::to_string(id())) && - getApp().getHashRouter ().swapSet ( - signingHash, peers, SF_RELAYED)) - { - overlay_.foreach (send_if_not ( - std::make_shared (*packet, protocol::mtVALIDATION), - peer_in_set(peers))); - } + if (getApp().getOPs ().recvValidation( + val, std::to_string(id()))) + overlay_.relay(*packet, signingHash); } catch (...) { diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 632fd000e..4e245bab1 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -153,6 +153,7 @@ private: bool gracefulClose_ = false; std::unique_ptr load_event_; std::unique_ptr validatorsConnection_; + bool hopsAware_ = false; //-------------------------------------------------------------------------- @@ -237,6 +238,12 @@ public: return slot_->cluster(); } + bool + hopsAware() const + { + return hopsAware_; + } + void check(); diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index 69549b8c0..e47ff3057 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -173,6 +173,7 @@ message TMProposeSet optional bool checkedSignature = 7; // node vouches signature is correct repeated bytes addedTransactions = 10; // not required if number is large repeated bytes removedTransactions = 11; // not required if number is large + optional uint32 hops = 12; // Number of hops traveled } enum TxSetStatus @@ -192,8 +193,9 @@ message TMHaveTransactionSet // Used to sign a final closed ledger after reprocessing message TMValidation { - required bytes validation = 1; // in STValidation signed form - optional bool checkedSignature = 2; // node vouches signature is correct + required bytes validation = 1; // in STValidation signed form + optional bool checkedSignature = 2; // node vouches signature is correct + optional uint32 hops = 3; // Number of hops traveled } message TMGetPeers