From 0c134582ca4173b7451adf28845db140f4959c2b Mon Sep 17 00:00:00 2001 From: David Schwartz Date: Fri, 3 Apr 2015 12:32:06 -0700 Subject: [PATCH] Track peer "sanity" (RIPD-836) * Each peer has a "sane/insane/unknown" status * Status updated based on peer ledger sequence * Status reported in peer json * Only sane peers preferred for historical ledgers * Overlay endpoints only accepted from known sane peers * Untrusted proposals not relayed from insane peers * Untrusted validations not relayed from insane peers * Transactions from insane peers are not processed * Periodically drop outbound connections to bad peers * Bad peers get bootcache valence of zero Peer "sanity" is based on the ledger sequence number they are on. We quickly become able to assess this based on current trusted validations. We quarrantine rogue messages and disconnect bad outbound connections to help maintain the configured number of good outbound connections. --- src/ripple/app/ledger/LedgerMaster.cpp | 13 ++- src/ripple/overlay/Overlay.h | 13 +++ src/ripple/overlay/impl/OverlayImpl.cpp | 31 +++++- src/ripple/overlay/impl/OverlayImpl.h | 8 ++ src/ripple/overlay/impl/PeerImp.cpp | 120 +++++++++++++++++++++++- src/ripple/overlay/impl/PeerImp.h | 23 +++++ src/ripple/overlay/impl/Tuning.h | 21 ++++- src/ripple/peerfinder/Manager.h | 3 + src/ripple/peerfinder/impl/Logic.h | 7 ++ src/ripple/peerfinder/impl/Manager.cpp | 7 ++ src/ripple/protocol/JsonFields.h | 1 + 11 files changed, 241 insertions(+), 6 deletions(-) diff --git a/src/ripple/app/ledger/LedgerMaster.cpp b/src/ripple/app/ledger/LedgerMaster.cpp index 4a8b07fea..574601530 100644 --- a/src/ripple/app/ledger/LedgerMaster.cpp +++ b/src/ripple/app/ledger/LedgerMaster.cpp @@ -710,9 +710,18 @@ public: if (!ledger) { - // FIXME: We should really only fetch if the ledger - //has sufficient validations to accept it + if ((seq != 0) && (getValidLedgerIndex() == 0)) + { + // Set peers sane early if we can + if (getApp().getValidations().getTrustedValidationCount (hash) >= + mMinValidations) + { + getApp().overlay().checkSanity (seq); + } + } + // FIXME: We may not want to fetch a ledger with just one + // trusted validation InboundLedger::pointer l = getApp().getInboundLedgers().findCreate(hash, 0, InboundLedger::fcGENERIC); if (l && l->isComplete() && !l->isFailed()) diff --git a/src/ripple/overlay/Overlay.h b/src/ripple/overlay/Overlay.h index ce7ac030c..f55666fb6 100644 --- a/src/ripple/overlay/Overlay.h +++ b/src/ripple/overlay/Overlay.h @@ -113,6 +113,19 @@ public: PeerSequence getActivePeers () = 0; + /** Calls the checkSanity function on each peer + @param index the value to pass to the peer's checkSanity function + */ + virtual + void + checkSanity (std::uint32_t index) = 0; + + /** Calls the check function on each peer + */ + virtual + void + check () = 0; + /** Returns the peer with the matching short id, or null. */ virtual Peer::ptr diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index af7c93dfd..b7b492948 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -108,6 +108,9 @@ OverlayImpl::Timer::on_timer (error_code ec) overlay_.sendEndpoints(); overlay_.autoConnect(); + if ((++overlay_.timer_count_ % Tuning::checkSeconds) == 0) + overlay_.check(); + timer_.expires_from_now (std::chrono::seconds(1)); timer_.async_wait(overlay_.strand_.wrap(std::bind( &Timer::on_timer, shared_from_this(), @@ -136,6 +139,7 @@ OverlayImpl::OverlayImpl ( get_seconds_clock(), deprecatedLogs().journal("PeerFinder"), config)) , m_resolver (resolver) , next_id_(1) + , timer_count_(0) { beast::PropertyStream::Source::add (m_peerFinder.get()); } @@ -585,8 +589,7 @@ OverlayImpl::crawl() std::lock_guard lock (mutex_); for (auto const& e : m_publicKeyMap) { - auto const sp = e.second.lock(); - if (sp) + if (auto const sp = e.second.lock()) { auto& pv = av.append(Json::Value(Json::objectValue)); pv[jss::type] = "peer"; @@ -648,6 +651,30 @@ OverlayImpl::getActivePeers() return ret; } +void +OverlayImpl::checkSanity (std::uint32_t index) +{ + std::lock_guard lock (mutex_); + + for (auto const& e : m_publicKeyMap) + { + if (auto const sp = e.second.lock()) + sp->checkSanity (index); + } +} + +void +OverlayImpl::check () +{ + std::lock_guard lock (mutex_); + + for (auto const& e : m_publicKeyMap) + { + if (auto const sp = e.second.lock()) + sp->check (); + } +} + Peer::ptr OverlayImpl::findPeerByShortID (Peer::id_t const& id) { diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 45ce13dfd..8b2aed142 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -118,6 +118,8 @@ private: std::atomic next_id_; + int timer_count_; + //-------------------------------------------------------------------------- public: @@ -163,6 +165,12 @@ public: PeerSequence getActivePeers() override; + void + check () override; + + void + checkSanity (std::uint32_t) override; + Peer::ptr findPeerByShortID (Peer::id_t const& id) override; diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index e52ed791d..8b5e7aaf4 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -67,6 +67,8 @@ PeerImp::PeerImp (id_t id, endpoint_type remote_endpoint, , overlay_ (overlay) , m_inbound (true) , state_ (State::active) + , sanity_ (Sanity::unknown) + , insaneTime_ (clock_type::now()) , publicKey_(publicKey) , hello_(hello) , usage_(consumer) @@ -251,6 +253,21 @@ PeerImp::json() if (closedLedgerHash_ != zero) ret[jss::ledger] = to_string (closedLedgerHash_); + switch (sanity_.load ()) + { + case Sanity::insane: + ret[jss::sanity] = "insane"; + break; + + case Sanity::unknown: + ret[jss::sanity] = "unknown"; + break; + + case Sanity::sane: + // Nothing to do here + break; + } + if (last_status_.has_newstatus ()) { switch (last_status_.newstatus ()) @@ -291,7 +308,8 @@ bool PeerImp::hasLedger (uint256 const& hash, std::uint32_t seq) const { std::lock_guard sl(recentLock_); - if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_)) + if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) && + (sanity_.load() == Sanity::sane)) return true; return std::find (recentLedgers_.begin(), recentLedgers_.end(), hash) != recentLedgers_.end(); @@ -801,6 +819,12 @@ PeerImp::onMessage (std::shared_ptr const& m) void PeerImp::onMessage (std::shared_ptr const& m) { + if (sanity_.load() != Sanity::sane) + { + // Don't allow endpoints from peer not known sane + return; + } + std::vector endpoints; endpoints.reserve (m->endpoints().size()); @@ -844,6 +868,9 @@ void PeerImp::onMessage (std::shared_ptr const& m) { + if (sanity_.load() == Sanity::insane) + return; + if (getApp().getOPs().isNeedNetworkLedger ()) { // If we've never been in synch, there's nothing we can do @@ -1035,6 +1062,13 @@ PeerImp::onMessage (std::shared_ptr const& m) } bool isTrusted = getApp().getUNL ().nodeInUNL (signerPublic); + + if (!isTrusted && (sanity_.load() == Sanity::insane)) + { + p_journal_.debug << "Proposal: Dropping UNTRUSTED (insane)"; + return; + } + if (!isTrusted && getApp().getFeeTrack ().isLoadedLocal ()) { p_journal_.debug << "Proposal: Dropping UNTRUSTED (load)"; @@ -1119,6 +1153,85 @@ PeerImp::onMessage (std::shared_ptr const& m) if (maxLedger_ == 0) minLedger_ = 0; } + + if (m->has_ledgerseq() && + getApp().getLedgerMaster().getValidatedLedgerAge() < 120) + { + checkSanity (m->ledgerseq(), getApp().getLedgerMaster().getValidLedgerIndex()); + } +} + +void +PeerImp::checkSanity (std::uint32_t validationSeq) +{ + std::uint32_t serverSeq; + { + // Extract the seqeuence number of the highest + // ledger this peer has + std::lock_guard sl (recentLock_); + + serverSeq = maxLedger_; + } + if (serverSeq != 0) + { + // Compare the peer's ledger sequence to the + // sequence of a recently-validated ledger + checkSanity (serverSeq, validationSeq); + } +} + +void +PeerImp::checkSanity (std::uint32_t seq1, std::uint32_t seq2) +{ + int diff = std::max (seq1, seq2) - std::min (seq1, seq2); + + if (diff < Tuning::saneLedgerLimit) + { + // The peer's ledger sequence is close to the validation's + sanity_ = Sanity::sane; + } + + if ((diff > Tuning::insaneLedgerLimit) && (sanity_.load() != Sanity::insane)) + { + // The peer's ledger sequence is way off the validation's + std::lock_guard sl(recentLock_); + + sanity_ = Sanity::insane; + insaneTime_ = clock_type::now(); + } +} + +// Should this connection be rejected +// and considered a failure +void PeerImp::check () +{ + if (m_inbound || (sanity_.load() == Sanity::sane)) + return; + + clock_type::time_point insaneTime; + { + std::lock_guard sl(recentLock_); + + insaneTime = insaneTime_; + } + + bool reject = false; + + if (sanity_.load() == Sanity::insane) + reject = (insaneTime - clock_type::now()) + > std::chrono::seconds (Tuning::maxInsaneTime); + + if (sanity_.load() == Sanity::unknown) + reject = (insaneTime - clock_type::now()) + > std::chrono::seconds (Tuning::maxUnknownTime); + + if (reject) + { + overlay_.peerFinder().on_failure (slot_); + strand_.post (std::bind ( + (void (PeerImp::*)(std::string const&)) &PeerImp::fail, + shared_from_this(), "Not useful")); + } } void @@ -1192,6 +1305,11 @@ PeerImp::onMessage (std::shared_ptr const& m) } bool isTrusted = getApp().getUNL ().nodeInUNL (val->getSignerPublic ()); + if (!isTrusted && (sanity_.load () == Sanity::insane)) + { + p_journal_.debug << + "Validation: dropping untrusted from insane peer"; + } if (isTrusted || !getApp().getFeeTrack ().isLoadedLocal ()) { getApp().getJobQueue ().addJob (isTrusted ? diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 299186837..533d2aa93 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -80,6 +80,13 @@ public: ,active }; + enum class Sanity + { + insane + ,unknown + ,sane + }; + typedef std::shared_ptr ptr; private: @@ -116,6 +123,8 @@ private: OverlayImpl& overlay_; bool m_inbound; State state_; // Current state + std::atomic sanity_; + clock_type::time_point insaneTime_; bool detaching_ = false; // Node public key of peer. RippleAddress publicKey_; @@ -227,6 +236,18 @@ public: return slot_->cluster(); } + void + check(); + + /** Check if the peer is sane + @param validationSeq The ledger sequence of a recently-validated ledger + */ + void + checkSanity (std::uint32_t validationSeq); + + void + checkSanity (std::uint32_t seq1, std::uint32_t seq2); + RippleAddress const& getNodePublic () const override { @@ -445,6 +466,8 @@ PeerImp::PeerImp (std::unique_ptr&& ssl_bundle, , overlay_ (overlay) , m_inbound (false) , state_ (State::active) + , sanity_ (Sanity::unknown) + , insaneTime_ (clock_type::now()) , publicKey_ (legacyPublicKey) , hello_ (std::move(hello)) , usage_ (usage) diff --git a/src/ripple/overlay/impl/Tuning.h b/src/ripple/overlay/impl/Tuning.h index e084e8bf5..d3b49ada0 100644 --- a/src/ripple/overlay/impl/Tuning.h +++ b/src/ripple/overlay/impl/Tuning.h @@ -28,7 +28,26 @@ namespace Tuning enum { /** Size of buffer used to read from the socket. */ - readBufferBytes = 4096 + readBufferBytes = 4096, + + /** How long a server can remain insane before we + disconnected it (if outbound) */ + maxInsaneTime = 60, + + /** How long a server can remain unknown before we + disconnect it (if outbound) */ + maxUnknownTime = 300, + + /** How many ledgers off a server can be and we will + still consider it sane */ + saneLedgerLimit = 24, + + /** How many ledgers off a server has to be before we + consider it insane */ + insaneLedgerLimit = 128, + + /** How often we check connections (seconds) */ + checkSeconds = 10, }; } // Tuning diff --git a/src/ripple/peerfinder/Manager.h b/src/ripple/peerfinder/Manager.h index 7fada8eac..b6d38a64c 100644 --- a/src/ripple/peerfinder/Manager.h +++ b/src/ripple/peerfinder/Manager.h @@ -196,6 +196,9 @@ public: */ virtual void on_closed (Slot::ptr const& slot) = 0; + /** Called when an outbound connection is deemed to have failed */ + virtual void on_failure (Slot::ptr const& slot) = 0; + /** Called when we received redirect IPs from a busy peer. */ virtual void diff --git a/src/ripple/peerfinder/impl/Logic.h b/src/ripple/peerfinder/impl/Logic.h index 841f51b79..5c7b7f598 100644 --- a/src/ripple/peerfinder/impl/Logic.h +++ b/src/ripple/peerfinder/impl/Logic.h @@ -947,6 +947,13 @@ public: } } + void on_failure (SlotImp::ptr const& slot) + { + typename SharedState::Access state (m_state); + + state->bootcache.on_failure (slot->remote_endpoint ()); + } + // Insert a set of redirect IP addresses into the Bootcache template void diff --git a/src/ripple/peerfinder/impl/Manager.cpp b/src/ripple/peerfinder/impl/Manager.cpp index d8f1a2e8c..20d968da3 100644 --- a/src/ripple/peerfinder/impl/Manager.cpp +++ b/src/ripple/peerfinder/impl/Manager.cpp @@ -154,6 +154,13 @@ public: m_logic.on_closed (impl); } + void + on_failure (Slot::ptr const& slot) override + { + SlotImp::ptr impl (std::dynamic_pointer_cast (slot)); + m_logic.on_failure (impl); + } + void onRedirects (boost::asio::ip::tcp::endpoint const& remote_address, std::vector const& eps) override diff --git a/src/ripple/protocol/JsonFields.h b/src/ripple/protocol/JsonFields.h index 9fdd62b7c..ad2793d78 100644 --- a/src/ripple/protocol/JsonFields.h +++ b/src/ripple/protocol/JsonFields.h @@ -293,6 +293,7 @@ JSS ( result ); // RPC JSS ( ripple_lines ); // out: NetworkOPs JSS ( ripple_state ); // in: LedgerEntr JSS ( rt_accounts ); // in: Subscribe, Unsubscribe +JSS ( sanity ); // out: PeerImp JSS ( search_depth ); // in: RipplePathFind JSS ( secret ); // in: TransactionSign, WalletSeed, // ValidationCreate, ValidationSeed