Track peer "sanity" (RIPD-836)

* Each peer has a "sane/insane/unknown" status
* Status updated based on peer ledger sequence
* Status reported in peer json
* Only sane peers preferred for historical ledgers
* Overlay endpoints only accepted from known sane peers
* Untrusted proposals not relayed from insane peers
* Untrusted validations not relayed from insane peers
* Transactions from insane peers are not processed
* Periodically drop outbound connections to bad peers
* Bad peers get bootcache valence of zero

Peer "sanity" is based on the ledger sequence number they are on.  We
quickly become able to assess this based on current trusted validations.
We quarrantine rogue messages and disconnect bad outbound connections to
help maintain the configured number of good outbound connections.
This commit is contained in:
David Schwartz
2015-04-03 12:32:06 -07:00
committed by Miguel Portilla
parent acf2833362
commit 0c134582ca
11 changed files with 241 additions and 6 deletions

View File

@@ -710,9 +710,18 @@ public:
if (!ledger) if (!ledger)
{ {
// FIXME: We should really only fetch if the ledger if ((seq != 0) && (getValidLedgerIndex() == 0))
//has sufficient validations to accept it {
// Set peers sane early if we can
if (getApp().getValidations().getTrustedValidationCount (hash) >=
mMinValidations)
{
getApp().overlay().checkSanity (seq);
}
}
// FIXME: We may not want to fetch a ledger with just one
// trusted validation
InboundLedger::pointer l = InboundLedger::pointer l =
getApp().getInboundLedgers().findCreate(hash, 0, InboundLedger::fcGENERIC); getApp().getInboundLedgers().findCreate(hash, 0, InboundLedger::fcGENERIC);
if (l && l->isComplete() && !l->isFailed()) if (l && l->isComplete() && !l->isFailed())

View File

@@ -113,6 +113,19 @@ public:
PeerSequence PeerSequence
getActivePeers () = 0; getActivePeers () = 0;
/** Calls the checkSanity function on each peer
@param index the value to pass to the peer's checkSanity function
*/
virtual
void
checkSanity (std::uint32_t index) = 0;
/** Calls the check function on each peer
*/
virtual
void
check () = 0;
/** Returns the peer with the matching short id, or null. */ /** Returns the peer with the matching short id, or null. */
virtual virtual
Peer::ptr Peer::ptr

View File

@@ -108,6 +108,9 @@ OverlayImpl::Timer::on_timer (error_code ec)
overlay_.sendEndpoints(); overlay_.sendEndpoints();
overlay_.autoConnect(); overlay_.autoConnect();
if ((++overlay_.timer_count_ % Tuning::checkSeconds) == 0)
overlay_.check();
timer_.expires_from_now (std::chrono::seconds(1)); timer_.expires_from_now (std::chrono::seconds(1));
timer_.async_wait(overlay_.strand_.wrap(std::bind( timer_.async_wait(overlay_.strand_.wrap(std::bind(
&Timer::on_timer, shared_from_this(), &Timer::on_timer, shared_from_this(),
@@ -136,6 +139,7 @@ OverlayImpl::OverlayImpl (
get_seconds_clock(), deprecatedLogs().journal("PeerFinder"), config)) get_seconds_clock(), deprecatedLogs().journal("PeerFinder"), config))
, m_resolver (resolver) , m_resolver (resolver)
, next_id_(1) , next_id_(1)
, timer_count_(0)
{ {
beast::PropertyStream::Source::add (m_peerFinder.get()); beast::PropertyStream::Source::add (m_peerFinder.get());
} }
@@ -585,8 +589,7 @@ OverlayImpl::crawl()
std::lock_guard <decltype(mutex_)> lock (mutex_); std::lock_guard <decltype(mutex_)> lock (mutex_);
for (auto const& e : m_publicKeyMap) for (auto const& e : m_publicKeyMap)
{ {
auto const sp = e.second.lock(); if (auto const sp = e.second.lock())
if (sp)
{ {
auto& pv = av.append(Json::Value(Json::objectValue)); auto& pv = av.append(Json::Value(Json::objectValue));
pv[jss::type] = "peer"; pv[jss::type] = "peer";
@@ -648,6 +651,30 @@ OverlayImpl::getActivePeers()
return ret; return ret;
} }
void
OverlayImpl::checkSanity (std::uint32_t index)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
for (auto const& e : m_publicKeyMap)
{
if (auto const sp = e.second.lock())
sp->checkSanity (index);
}
}
void
OverlayImpl::check ()
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
for (auto const& e : m_publicKeyMap)
{
if (auto const sp = e.second.lock())
sp->check ();
}
}
Peer::ptr Peer::ptr
OverlayImpl::findPeerByShortID (Peer::id_t const& id) OverlayImpl::findPeerByShortID (Peer::id_t const& id)
{ {

View File

@@ -118,6 +118,8 @@ private:
std::atomic <Peer::id_t> next_id_; std::atomic <Peer::id_t> next_id_;
int timer_count_;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
public: public:
@@ -163,6 +165,12 @@ public:
PeerSequence PeerSequence
getActivePeers() override; getActivePeers() override;
void
check () override;
void
checkSanity (std::uint32_t) override;
Peer::ptr Peer::ptr
findPeerByShortID (Peer::id_t const& id) override; findPeerByShortID (Peer::id_t const& id) override;

View File

@@ -67,6 +67,8 @@ PeerImp::PeerImp (id_t id, endpoint_type remote_endpoint,
, overlay_ (overlay) , overlay_ (overlay)
, m_inbound (true) , m_inbound (true)
, state_ (State::active) , state_ (State::active)
, sanity_ (Sanity::unknown)
, insaneTime_ (clock_type::now())
, publicKey_(publicKey) , publicKey_(publicKey)
, hello_(hello) , hello_(hello)
, usage_(consumer) , usage_(consumer)
@@ -251,6 +253,21 @@ PeerImp::json()
if (closedLedgerHash_ != zero) if (closedLedgerHash_ != zero)
ret[jss::ledger] = to_string (closedLedgerHash_); ret[jss::ledger] = to_string (closedLedgerHash_);
switch (sanity_.load ())
{
case Sanity::insane:
ret[jss::sanity] = "insane";
break;
case Sanity::unknown:
ret[jss::sanity] = "unknown";
break;
case Sanity::sane:
// Nothing to do here
break;
}
if (last_status_.has_newstatus ()) if (last_status_.has_newstatus ())
{ {
switch (last_status_.newstatus ()) switch (last_status_.newstatus ())
@@ -291,7 +308,8 @@ bool
PeerImp::hasLedger (uint256 const& hash, std::uint32_t seq) const PeerImp::hasLedger (uint256 const& hash, std::uint32_t seq) const
{ {
std::lock_guard<std::mutex> sl(recentLock_); std::lock_guard<std::mutex> sl(recentLock_);
if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_)) if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) &&
(sanity_.load() == Sanity::sane))
return true; return true;
return std::find (recentLedgers_.begin(), return std::find (recentLedgers_.begin(),
recentLedgers_.end(), hash) != recentLedgers_.end(); recentLedgers_.end(), hash) != recentLedgers_.end();
@@ -801,6 +819,12 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMPeers> const& m)
void void
PeerImp::onMessage (std::shared_ptr <protocol::TMEndpoints> const& m) PeerImp::onMessage (std::shared_ptr <protocol::TMEndpoints> const& m)
{ {
if (sanity_.load() != Sanity::sane)
{
// Don't allow endpoints from peer not known sane
return;
}
std::vector <PeerFinder::Endpoint> endpoints; std::vector <PeerFinder::Endpoint> endpoints;
endpoints.reserve (m->endpoints().size()); endpoints.reserve (m->endpoints().size());
@@ -844,6 +868,9 @@ void
PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m) PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
{ {
if (sanity_.load() == Sanity::insane)
return;
if (getApp().getOPs().isNeedNetworkLedger ()) if (getApp().getOPs().isNeedNetworkLedger ())
{ {
// If we've never been in synch, there's nothing we can do // If we've never been in synch, there's nothing we can do
@@ -1035,6 +1062,13 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMProposeSet> const& m)
} }
bool isTrusted = getApp().getUNL ().nodeInUNL (signerPublic); bool isTrusted = getApp().getUNL ().nodeInUNL (signerPublic);
if (!isTrusted && (sanity_.load() == Sanity::insane))
{
p_journal_.debug << "Proposal: Dropping UNTRUSTED (insane)";
return;
}
if (!isTrusted && getApp().getFeeTrack ().isLoadedLocal ()) if (!isTrusted && getApp().getFeeTrack ().isLoadedLocal ())
{ {
p_journal_.debug << "Proposal: Dropping UNTRUSTED (load)"; p_journal_.debug << "Proposal: Dropping UNTRUSTED (load)";
@@ -1119,6 +1153,85 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMStatusChange> const& m)
if (maxLedger_ == 0) if (maxLedger_ == 0)
minLedger_ = 0; minLedger_ = 0;
} }
if (m->has_ledgerseq() &&
getApp().getLedgerMaster().getValidatedLedgerAge() < 120)
{
checkSanity (m->ledgerseq(), getApp().getLedgerMaster().getValidLedgerIndex());
}
}
void
PeerImp::checkSanity (std::uint32_t validationSeq)
{
std::uint32_t serverSeq;
{
// Extract the seqeuence number of the highest
// ledger this peer has
std::lock_guard<std::mutex> sl (recentLock_);
serverSeq = maxLedger_;
}
if (serverSeq != 0)
{
// Compare the peer's ledger sequence to the
// sequence of a recently-validated ledger
checkSanity (serverSeq, validationSeq);
}
}
void
PeerImp::checkSanity (std::uint32_t seq1, std::uint32_t seq2)
{
int diff = std::max (seq1, seq2) - std::min (seq1, seq2);
if (diff < Tuning::saneLedgerLimit)
{
// The peer's ledger sequence is close to the validation's
sanity_ = Sanity::sane;
}
if ((diff > Tuning::insaneLedgerLimit) && (sanity_.load() != Sanity::insane))
{
// The peer's ledger sequence is way off the validation's
std::lock_guard<std::mutex> sl(recentLock_);
sanity_ = Sanity::insane;
insaneTime_ = clock_type::now();
}
}
// Should this connection be rejected
// and considered a failure
void PeerImp::check ()
{
if (m_inbound || (sanity_.load() == Sanity::sane))
return;
clock_type::time_point insaneTime;
{
std::lock_guard<std::mutex> sl(recentLock_);
insaneTime = insaneTime_;
}
bool reject = false;
if (sanity_.load() == Sanity::insane)
reject = (insaneTime - clock_type::now())
> std::chrono::seconds (Tuning::maxInsaneTime);
if (sanity_.load() == Sanity::unknown)
reject = (insaneTime - clock_type::now())
> std::chrono::seconds (Tuning::maxUnknownTime);
if (reject)
{
overlay_.peerFinder().on_failure (slot_);
strand_.post (std::bind (
(void (PeerImp::*)(std::string const&)) &PeerImp::fail,
shared_from_this(), "Not useful"));
}
} }
void void
@@ -1192,6 +1305,11 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMValidation> const& m)
} }
bool isTrusted = getApp().getUNL ().nodeInUNL (val->getSignerPublic ()); bool isTrusted = getApp().getUNL ().nodeInUNL (val->getSignerPublic ());
if (!isTrusted && (sanity_.load () == Sanity::insane))
{
p_journal_.debug <<
"Validation: dropping untrusted from insane peer";
}
if (isTrusted || !getApp().getFeeTrack ().isLoadedLocal ()) if (isTrusted || !getApp().getFeeTrack ().isLoadedLocal ())
{ {
getApp().getJobQueue ().addJob (isTrusted ? getApp().getJobQueue ().addJob (isTrusted ?

View File

@@ -80,6 +80,13 @@ public:
,active ,active
}; };
enum class Sanity
{
insane
,unknown
,sane
};
typedef std::shared_ptr <PeerImp> ptr; typedef std::shared_ptr <PeerImp> ptr;
private: private:
@@ -116,6 +123,8 @@ private:
OverlayImpl& overlay_; OverlayImpl& overlay_;
bool m_inbound; bool m_inbound;
State state_; // Current state State state_; // Current state
std::atomic<Sanity> sanity_;
clock_type::time_point insaneTime_;
bool detaching_ = false; bool detaching_ = false;
// Node public key of peer. // Node public key of peer.
RippleAddress publicKey_; RippleAddress publicKey_;
@@ -227,6 +236,18 @@ public:
return slot_->cluster(); return slot_->cluster();
} }
void
check();
/** Check if the peer is sane
@param validationSeq The ledger sequence of a recently-validated ledger
*/
void
checkSanity (std::uint32_t validationSeq);
void
checkSanity (std::uint32_t seq1, std::uint32_t seq2);
RippleAddress const& RippleAddress const&
getNodePublic () const override getNodePublic () const override
{ {
@@ -445,6 +466,8 @@ PeerImp::PeerImp (std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
, overlay_ (overlay) , overlay_ (overlay)
, m_inbound (false) , m_inbound (false)
, state_ (State::active) , state_ (State::active)
, sanity_ (Sanity::unknown)
, insaneTime_ (clock_type::now())
, publicKey_ (legacyPublicKey) , publicKey_ (legacyPublicKey)
, hello_ (std::move(hello)) , hello_ (std::move(hello))
, usage_ (usage) , usage_ (usage)

View File

@@ -28,7 +28,26 @@ namespace Tuning
enum enum
{ {
/** Size of buffer used to read from the socket. */ /** Size of buffer used to read from the socket. */
readBufferBytes = 4096 readBufferBytes = 4096,
/** How long a server can remain insane before we
disconnected it (if outbound) */
maxInsaneTime = 60,
/** How long a server can remain unknown before we
disconnect it (if outbound) */
maxUnknownTime = 300,
/** How many ledgers off a server can be and we will
still consider it sane */
saneLedgerLimit = 24,
/** How many ledgers off a server has to be before we
consider it insane */
insaneLedgerLimit = 128,
/** How often we check connections (seconds) */
checkSeconds = 10,
}; };
} // Tuning } // Tuning

View File

@@ -196,6 +196,9 @@ public:
*/ */
virtual void on_closed (Slot::ptr const& slot) = 0; virtual void on_closed (Slot::ptr const& slot) = 0;
/** Called when an outbound connection is deemed to have failed */
virtual void on_failure (Slot::ptr const& slot) = 0;
/** Called when we received redirect IPs from a busy peer. */ /** Called when we received redirect IPs from a busy peer. */
virtual virtual
void void

View File

@@ -947,6 +947,13 @@ public:
} }
} }
void on_failure (SlotImp::ptr const& slot)
{
typename SharedState::Access state (m_state);
state->bootcache.on_failure (slot->remote_endpoint ());
}
// Insert a set of redirect IP addresses into the Bootcache // Insert a set of redirect IP addresses into the Bootcache
template <class FwdIter> template <class FwdIter>
void void

View File

@@ -154,6 +154,13 @@ public:
m_logic.on_closed (impl); m_logic.on_closed (impl);
} }
void
on_failure (Slot::ptr const& slot) override
{
SlotImp::ptr impl (std::dynamic_pointer_cast <SlotImp> (slot));
m_logic.on_failure (impl);
}
void void
onRedirects (boost::asio::ip::tcp::endpoint const& remote_address, onRedirects (boost::asio::ip::tcp::endpoint const& remote_address,
std::vector<boost::asio::ip::tcp::endpoint> const& eps) override std::vector<boost::asio::ip::tcp::endpoint> const& eps) override

View File

@@ -293,6 +293,7 @@ JSS ( result ); // RPC
JSS ( ripple_lines ); // out: NetworkOPs JSS ( ripple_lines ); // out: NetworkOPs
JSS ( ripple_state ); // in: LedgerEntr JSS ( ripple_state ); // in: LedgerEntr
JSS ( rt_accounts ); // in: Subscribe, Unsubscribe JSS ( rt_accounts ); // in: Subscribe, Unsubscribe
JSS ( sanity ); // out: PeerImp
JSS ( search_depth ); // in: RipplePathFind JSS ( search_depth ); // in: RipplePathFind
JSS ( secret ); // in: TransactionSign, WalletSeed, JSS ( secret ); // in: TransactionSign, WalletSeed,
// ValidationCreate, ValidationSeed // ValidationCreate, ValidationSeed