Structured Overlay support for TTL limited messages:

When the [overlay] configuration key "expire" is set to 1, proposals
and validations will include a hops field. The hops is incremented with
each relay. Messages with a hop count will be dropped when they exceed
the TTL (Time to Live). Messages containing a hops field will not be
relayed or broadcast to older versions of rippled that don't understand
the field.

This change will not affect normal operation of the network or rippled
instances that do not set "expire" to 1.
This commit is contained in:
Vinnie Falco
2015-04-28 09:24:01 -07:00
committed by Tom Ritchford
parent c77a2f335a
commit 90bb53af20
9 changed files with 168 additions and 44 deletions

View File

@@ -988,9 +988,7 @@ private:
protocol::TMValidation val; protocol::TMValidation val;
val.set_validation (&validation[0], validation.size ()); val.set_validation (&validation[0], validation.size ());
// Send signed validation to all of our directly connected peers // Send signed validation to all of our directly connected peers
getApp ().overlay ().foreach (send_always ( getApp().overlay().send(val);
std::make_shared <Message> (
val, protocol::mtVALIDATION)));
WriteLog (lsINFO, LedgerConsensus) WriteLog (lsINFO, LedgerConsensus)
<< "CNF Val " << newLCLHash; << "CNF Val " << newLCLHash;
} }
@@ -1256,9 +1254,7 @@ private:
Blob sig = mOurPosition->sign (); Blob sig = mOurPosition->sign ();
prop.set_nodepubkey (&pubKey[0], pubKey.size ()); prop.set_nodepubkey (&pubKey[0], pubKey.size ());
prop.set_signature (&sig[0], sig.size ()); prop.set_signature (&sig[0], sig.size ());
getApp ().overlay ().foreach (send_always ( getApp().overlay().send(prop);
std::make_shared<Message> (
prop, protocol::mtPROPOSE_LEDGER)));
} }
/** Let peers know that we a particular transactions set so they /** Let peers know that we a particular transactions set so they
@@ -1681,11 +1677,6 @@ private:
Blob validation = v->getSigned (); Blob validation = v->getSigned ();
protocol::TMValidation val; protocol::TMValidation val;
val.set_validation (&validation[0], validation.size ()); val.set_validation (&validation[0], validation.size ());
#if 0
getApp ().overlay ().visit (RelayMessage (
std::make_shared <Message> (
val, protocol::mtVALIDATION)));
#endif
getApp().getOPs ().setLastValidation (v); getApp().getOPs ().setLastValidation (v);
WriteLog (lsWARNING, LedgerConsensus) << "Sending partial validation"; WriteLog (lsWARNING, LedgerConsensus) << "Sending partial validation";
} }

View File

@@ -1710,21 +1710,10 @@ void NetworkOPsImp::processTrustedProposal (
} }
if (relay) if (relay)
{ getApp().overlay().relay(*set,
std::set<Peer::id_t> peers; proposal->getSuppressionID());
if (getApp().getHashRouter ().swapSet (
proposal->getSuppressionID (), peers, SF_RELAYED))
{
getApp ().overlay ().foreach (send_if_not (
std::make_shared<Message> (
*set, protocol::mtPROPOSE_LEDGER),
peer_in_set(peers)));
}
}
else else
{
m_journal.info << "Not relaying trusted proposal"; m_journal.info << "Not relaying trusted proposal";
}
} }
} }

View File

@@ -23,6 +23,7 @@
#include <beast/container/const_container.h> #include <beast/container/const_container.h>
#include <beast/utility/ci_char_traits.h> #include <beast/utility/ci_char_traits.h>
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
#include <boost/optional.hpp>
#include <map> #include <map>
#include <ostream> #include <ostream>
#include <string> #include <string>
@@ -139,6 +140,16 @@ public:
std::pair <std::string, bool> std::pair <std::string, bool>
find (std::string const& name) const; find (std::string const& name) const;
template <class T>
boost::optional<T>
get (std::string const& name) const
{
auto const iter = cont().find(name);
if (iter == cont().end())
return boost::none;
return boost::lexical_cast<T>(iter->second);
}
friend friend
std::ostream& std::ostream&
operator<< (std::ostream&, Section const& section); operator<< (std::ostream&, Section const& section);

View File

@@ -31,6 +31,7 @@
#include <beast/cxx14/type_traits.h> // <type_traits> #include <beast/cxx14/type_traits.h> // <type_traits>
#include <boost/asio/buffer.hpp> #include <boost/asio/buffer.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <boost/optional.hpp>
namespace boost { namespace asio { namespace ssl { class context; } } } namespace boost { namespace asio { namespace ssl { class context; } } }
@@ -65,6 +66,7 @@ public:
bool auto_connect = true; bool auto_connect = true;
Promote promote = Promote::automatic; Promote promote = Promote::automatic;
std::shared_ptr<boost::asio::ssl::context> context; std::shared_ptr<boost::asio::ssl::context> context;
bool expire = false;
}; };
typedef std::vector <Peer::ptr> PeerSequence; typedef std::vector <Peer::ptr> PeerSequence;
@@ -131,6 +133,28 @@ public:
Peer::ptr Peer::ptr
findPeerByShortID (Peer::id_t const& id) = 0; findPeerByShortID (Peer::id_t const& id) = 0;
/** Broadcast a proposal. */
virtual
void
send (protocol::TMProposeSet& m) = 0;
/** Broadcast a validation. */
virtual
void
send (protocol::TMValidation& m) = 0;
/** Relay a proposal. */
virtual
void
relay (protocol::TMProposeSet& m,
uint256 const& uid) = 0;
/** Relay a validation. */
virtual
void
relay (protocol::TMValidation& m,
uint256 const& uid) = 0;
/** Visit every active peer and return a value /** Visit every active peer and return a value
The functor must: The functor must:
- Be callable as: - Be callable as:

View File

@@ -18,6 +18,7 @@
//============================================================================== //==============================================================================
#include <BeastConfig.h> #include <BeastConfig.h>
#include <ripple/app/misc/IHashRouter.h>
#include <ripple/basics/Log.h> #include <ripple/basics/Log.h>
#include <ripple/basics/make_SSLContext.h> #include <ripple/basics/make_SSLContext.h>
#include <ripple/protocol/JsonFields.h> #include <ripple/protocol/JsonFields.h>
@@ -693,6 +694,75 @@ OverlayImpl::findPeerByShortID (Peer::id_t const& id)
return Peer::ptr(); return Peer::ptr();
} }
void
OverlayImpl::send (protocol::TMProposeSet& m)
{
if (setup_.expire)
m.set_hops(0);
auto const sm = std::make_shared<Message>(
m, protocol::mtPROPOSE_LEDGER);
for_each([&](std::shared_ptr<PeerImp> const& p)
{
if (! m.has_hops() || p->hopsAware())
p->send(sm);
});
}
void
OverlayImpl::send (protocol::TMValidation& m)
{
if (setup_.expire)
m.set_hops(0);
auto const sm = std::make_shared<Message>(
m, protocol::mtVALIDATION);
for_each([&](std::shared_ptr<PeerImp> const& p)
{
if (! m.has_hops() || p->hopsAware())
p->send(sm);
});
}
void
OverlayImpl::relay (protocol::TMProposeSet& m,
uint256 const& uid)
{
if (m.has_hops() && m.hops() >= maxTTL)
return;
std::set<Peer::id_t> skip;
if (! getApp().getHashRouter().swapSet (
uid, skip, SF_RELAYED))
return;
auto const sm = std::make_shared<Message>(
m, protocol::mtPROPOSE_LEDGER);
for_each([&](std::shared_ptr<PeerImp> const& p)
{
if (skip.find(p->id()) != skip.end())
return;
if (! m.has_hops() || p->hopsAware())
p->send(sm);
});
}
void
OverlayImpl::relay (protocol::TMValidation& m,
uint256 const& uid)
{
if (m.has_hops() && m.hops() >= maxTTL)
return;
std::set<Peer::id_t> skip;
if (! getApp().getHashRouter().swapSet (
uid, skip, SF_RELAYED))
return;
auto const sm = std::make_shared<Message>(
m, protocol::mtVALIDATION);
for_each([&](std::shared_ptr<PeerImp> const& p)
{
if (skip.find(p->id()) != skip.end())
return;
if (! m.has_hops() || p->hopsAware())
p->send(sm);
});
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void void
@@ -777,6 +847,7 @@ setup_Overlay (BasicConfig const& config)
else else
setup.promote = Overlay::Promote::automatic; setup.promote = Overlay::Promote::automatic;
setup.context = make_SSLContext(); setup.context = make_SSLContext();
setup.expire = get<bool>(section, "expire", false);
return setup; return setup;
} }

View File

@@ -46,6 +46,11 @@ namespace ripple {
class PeerImp; class PeerImp;
class BasicConfig; class BasicConfig;
enum
{
maxTTL = 2
};
class OverlayImpl : public Overlay class OverlayImpl : public Overlay
{ {
public: public:
@@ -174,6 +179,20 @@ public:
Peer::ptr Peer::ptr
findPeerByShortID (Peer::id_t const& id) override; findPeerByShortID (Peer::id_t const& id) override;
void
send (protocol::TMProposeSet& m) override;
void
send (protocol::TMValidation& m) override;
void
relay (protocol::TMProposeSet& m,
uint256 const& uid) override;
void
relay (protocol::TMValidation& m,
uint256 const& uid) override;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
// //
// OverlayImpl // OverlayImpl

View File

@@ -36,8 +36,10 @@
#include <ripple/server/ServerHandler.h> #include <ripple/server/ServerHandler.h>
#include <ripple/protocol/BuildInfo.h> #include <ripple/protocol/BuildInfo.h>
#include <ripple/protocol/JsonFields.h> #include <ripple/protocol/JsonFields.h>
#include <beast/module/core/diagnostic/SemanticVersion.h>
#include <beast/streams/debug_ostream.h> #include <beast/streams/debug_ostream.h>
#include <beast/weak_fn.h> #include <beast/weak_fn.h>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/asio/io_service.hpp> #include <boost/asio/io_service.hpp>
#include <functional> #include <functional>
#include <beast/cxx14/memory.h> // <memory> #include <beast/cxx14/memory.h> // <memory>
@@ -100,6 +102,20 @@ PeerImp::run()
if(! strand_.running_in_this_thread()) if(! strand_.running_in_this_thread())
return strand_.post(std::bind ( return strand_.post(std::bind (
&PeerImp::run, shared_from_this())); &PeerImp::run, shared_from_this()));
{
auto s = getVersion();
if (boost::starts_with(s, "rippled-"))
{
s.erase(s.begin(), s.begin() + 8);
beast::SemanticVersion v;
if (v.parse(s))
{
beast::SemanticVersion av;
av.parse("0.28.0-rc3");
hopsAware_ = v >= av;
}
}
}
if (m_inbound) if (m_inbound)
{ {
doAccept(); doAccept();
@@ -1008,6 +1024,10 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMProposeSet> const& m)
{ {
protocol::TMProposeSet& set = *m; protocol::TMProposeSet& set = *m;
if (overlay_.setup().expire &&
set.has_hops() && ! slot_->cluster())
set.set_hops(set.hops() + 1);
// VFALCO Magic numbers are bad // VFALCO Magic numbers are bad
if ((set.closetime() + 180) < getApp().getOPs().getCloseTimeNC()) if ((set.closetime() + 180) < getApp().getOPs().getCloseTimeNC())
return; return;
@@ -1276,6 +1296,10 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMValidation> const& m)
error_code ec; error_code ec;
std::uint32_t closeTime = getApp().getOPs().getCloseTimeNC(); std::uint32_t closeTime = getApp().getOPs().getCloseTimeNC();
if (overlay_.setup().expire &&
m->has_hops() && ! slot_->cluster())
m->set_hops(m->hops() + 1);
if (m->validation ().size () < 50) if (m->validation ().size () < 50)
{ {
p_journal_.warning << "Validation: Too small"; p_journal_.warning << "Validation: Too small";
@@ -1651,15 +1675,7 @@ PeerImp::checkPropose (Job& job,
// relay untrusted proposal // relay untrusted proposal
p_journal_.trace << p_journal_.trace <<
"relaying UNTRUSTED proposal"; "relaying UNTRUSTED proposal";
std::set<Peer::id_t> peers; overlay_.relay(set, proposal->getSuppressionID());
if (getApp().getHashRouter ().swapSet (
proposal->getSuppressionID (), peers, SF_RELAYED))
{
overlay_.foreach (send_if_not (
std::make_shared<Message> (set, protocol::mtPROPOSE_LEDGER),
peer_in_set(peers)));
}
} }
else else
{ {
@@ -1688,15 +1704,9 @@ PeerImp::checkValidation (Job&, STValidation::pointer val,
validatorsConnection_->onValidation(*val); validatorsConnection_->onValidation(*val);
#endif #endif
std::set<Peer::id_t> peers; if (getApp().getOPs ().recvValidation(
if (getApp().getOPs ().recvValidation (val, std::to_string(id())) && val, std::to_string(id())))
getApp().getHashRouter ().swapSet ( overlay_.relay(*packet, signingHash);
signingHash, peers, SF_RELAYED))
{
overlay_.foreach (send_if_not (
std::make_shared<Message> (*packet, protocol::mtVALIDATION),
peer_in_set(peers)));
}
} }
catch (...) catch (...)
{ {

View File

@@ -153,6 +153,7 @@ private:
bool gracefulClose_ = false; bool gracefulClose_ = false;
std::unique_ptr <LoadEvent> load_event_; std::unique_ptr <LoadEvent> load_event_;
std::unique_ptr<Validators::Connection> validatorsConnection_; std::unique_ptr<Validators::Connection> validatorsConnection_;
bool hopsAware_ = false;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
@@ -237,6 +238,12 @@ public:
return slot_->cluster(); return slot_->cluster();
} }
bool
hopsAware() const
{
return hopsAware_;
}
void void
check(); check();

View File

@@ -173,6 +173,7 @@ message TMProposeSet
optional bool checkedSignature = 7; // node vouches signature is correct optional bool checkedSignature = 7; // node vouches signature is correct
repeated bytes addedTransactions = 10; // not required if number is large repeated bytes addedTransactions = 10; // not required if number is large
repeated bytes removedTransactions = 11; // not required if number is large repeated bytes removedTransactions = 11; // not required if number is large
optional uint32 hops = 12; // Number of hops traveled
} }
enum TxSetStatus enum TxSetStatus
@@ -192,8 +193,9 @@ message TMHaveTransactionSet
// Used to sign a final closed ledger after reprocessing // Used to sign a final closed ledger after reprocessing
message TMValidation message TMValidation
{ {
required bytes validation = 1; // in STValidation signed form required bytes validation = 1; // in STValidation signed form
optional bool checkedSignature = 2; // node vouches signature is correct optional bool checkedSignature = 2; // node vouches signature is correct
optional uint32 hops = 3; // Number of hops traveled
} }
message TMGetPeers message TMGetPeers