From dc24748c24cc2ca8412735a895c3d517bd97a7f5 Mon Sep 17 00:00:00 2001 From: Scott Schurr Date: Mon, 29 Apr 2019 11:20:09 -0700 Subject: [PATCH] Improve locking of PeerImp member variables --- src/ripple/overlay/impl/PeerImp.cpp | 347 +++++++++++++++++----------- src/ripple/overlay/impl/PeerImp.h | 54 ++++- 2 files changed, 263 insertions(+), 138 deletions(-) diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 11f20396b4..224001ffba 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -88,14 +88,25 @@ PeerImp::PeerImp (Application& app, id_t id, endpoint_type remote_endpoint, PeerImp::~PeerImp () { - if (cluster()) - { - JLOG(journal_.warn()) << name_ << " left cluster"; - } + const bool inCluster {cluster()}; + if (state_ == State::active) overlay_.onPeerDeactivate(id_); overlay_.peerFinder().on_closed (slot_); overlay_.remove (slot_); + + if (inCluster) + { + JLOG(journal_.warn()) << getName() << " left cluster"; + } +} + +// Helper function to check for valid uint256 values in protobuf buffers +static +bool +stringIsUint256Sized (std::string const& pBuffStr) +{ + return pBuffStr.size() == uint256::size(); } void @@ -113,17 +124,21 @@ PeerImp::run() assert (state_ == State::active); // XXX Set timer: connection is in grace period to be useful. // XXX Set timer: connection idle (idle may vary depending on connection type.) - if ((hello_.has_ledgerclosed ()) && ( - hello_.ledgerclosed ().size () == (256 / 8))) + if (hello_.has_ledgerclosed() && + stringIsUint256Sized (hello_.ledgerclosed())) { - memcpy (closedLedgerHash_.begin (), - hello_.ledgerclosed ().data (), 256 / 8); - if ((hello_.has_ledgerprevious ()) && - (hello_.ledgerprevious ().size () == (256 / 8))) + // Operations on closedLedgerHash_ and previousLedgerHash_ must be + // guarded by recentLock_. + std::lock_guard sl(recentLock_); + + closedLedgerHash_ = hello_.ledgerclosed(); + + if (hello_.has_ledgerprevious() && + stringIsUint256Sized (hello_.ledgerprevious())) { - memcpy (previousLedgerHash_.begin (), - hello_.ledgerprevious ().data (), 256 / 8); - addLedger (previousLedgerHash_); + previousLedgerHash_ = hello_.ledgerprevious(); + + addLedger (previousLedgerHash_, sl); } else { @@ -190,10 +205,12 @@ PeerImp::send (Message::pointer const& m) // a small senq periodically large_sendq_ = 0; } - else if ((sendq_size % Tuning::sendQueueLogFreq) == 0) + else if (journal_.active (beast::severities::kDebug) && + (sendq_size % Tuning::sendQueueLogFreq) == 0) { + std::string const name {getName()}; JLOG (journal_.debug()) << - (name_.empty() ? remote_address_.to_string() : name_) << + (name.empty() ? remote_address_.to_string() : name) << " sendq: " << sendq_size; } @@ -262,8 +279,9 @@ PeerImp::json() { ret[jss::cluster] = true; - if (!name_.empty ()) - ret[jss::name] = name_; + std::string name {getName()}; + if (!name.empty ()) + ret[jss::name] = std::move(name); } ret[jss::load] = usage_.balance (); @@ -295,9 +313,6 @@ PeerImp::json() ret[jss::complete_ledgers] = std::to_string(minSeq) + " - " + std::to_string(maxSeq); - if (closedLedgerHash_ != beast::zero) - ret[jss::ledger] = to_string (closedLedgerHash_); - switch (sanity_.load ()) { case Sanity::insane: @@ -313,9 +328,20 @@ PeerImp::json() break; } - if (last_status_.has_newstatus ()) + uint256 closedLedgerHash; + protocol::TMStatusChange last_status; { - switch (last_status_.newstatus ()) + std::lock_guard sl(recentLock_); + closedLedgerHash = closedLedgerHash_; + last_status = last_status_; + } + + if (closedLedgerHash != beast::zero) + ret[jss::ledger] = to_string (closedLedgerHash); + + if (last_status.has_newstatus ()) + { + switch (last_status.newstatus ()) { case protocol::nsCONNECTING: ret[jss::status] = "connecting"; @@ -338,9 +364,8 @@ PeerImp::json() break; default: - // FIXME: do we really want this? JLOG(p_journal_.warn()) << - "Unknown status: " << last_status_.newstatus (); + "Unknown status: " << last_status.newstatus (); } } @@ -397,6 +422,9 @@ PeerImp::hasTxSet (uint256 const& hash) const void PeerImp::cycleStatus () { + // Operations on closedLedgerHash_ and previousLedgerHash_ must be + // guarded by recentLock_. + std::lock_guard sl(recentLock_); previousLedgerHash_ = closedLedgerHash_; closedLedgerHash_.zero (); } @@ -410,7 +438,10 @@ PeerImp::supportsVersion (int version) bool PeerImp::hasRange (std::uint32_t uMin, std::uint32_t uMax) { - return (sanity_ != Sanity::insane) && (uMin >= minLedger_) && (uMax <= maxLedger_); + std::lock_guard sl(recentLock_); + return (sanity_ != Sanity::insane) && + (uMin >= minLedger_) && + (uMax <= maxLedger_); } //------------------------------------------------------------------------------ @@ -447,10 +478,11 @@ PeerImp::fail(std::string const& reason) (void (Peer::*)(std::string const&)) & PeerImp::fail, shared_from_this(), reason)); - if (socket_.is_open()) + if (journal_.active (beast::severities::kWarning) && socket_.is_open()) { + std::string const name {getName()}; JLOG (journal_.warn()) << - (name_.empty() ? remote_address_.to_string() : name_) << + (name.empty() ? remote_address_.to_string() : name) << " failed: " << reason; } close(); @@ -568,36 +600,49 @@ PeerImp::onTimer (error_code const& ec) return; } - if (no_ping_++ >= Tuning::noPing) + bool failedNoPing {false}; + boost::optional pingSeq; + // Operations on lastPingSeq_, lastPingTime_, no_ping_, and latency_ + // must be guarded by recentLock_. + { + std::lock_guard sl(recentLock_); + if (no_ping_++ >= Tuning::noPing) + { + failedNoPing = true; + } + else if (!lastPingSeq_) + { + // Make the sequence unpredictable enough to prevent guessing + lastPingSeq_ = rand_int(); + lastPingTime_ = clock_type::now(); + pingSeq = lastPingSeq_; + } + else + { + // We have an outstanding ping, raise latency + auto const minLatency = + std::chrono::duration_cast + (clock_type::now() - lastPingTime_); + + if (latency_ < minLatency) + latency_ = minLatency; + } + } + + if (failedNoPing) { fail ("No ping reply received"); return; } - if (!lastPingSeq_) + if (pingSeq) { - // Make the sequence unpredictable enough to prevent guessing - lastPingSeq_ = rand_int(); - lastPingTime_ = clock_type::now(); - protocol::TMPing message; message.set_type (protocol::TMPing::ptPING); - message.set_seq (*lastPingSeq_); + message.set_seq (*pingSeq); send (std::make_shared (message, protocol::mtPING)); } - else - { - // We have an outstanding ping, raise latency - auto const minLatency = - std::chrono::duration_cast - (clock_type::now() - lastPingTime_); - - std::lock_guard sl(recentLock_); - - if (latency_ < minLatency) - latency_ = minLatency; - } setTimer(); } @@ -647,25 +692,31 @@ void PeerImp::doAccept() publicKey_); if (auto member = app_.cluster().member(publicKey_)) { - name_ = *member; - JLOG(journal_.info()) << "Cluster name: " << name_; + { + std::unique_lock lock{nameMutex_}; + name_ = *member; + } + JLOG(journal_.info()) << "Cluster name: " << *member; } overlay_.activate(shared_from_this()); // XXX Set timer: connection is in grace period to be useful. // XXX Set timer: connection idle (idle may vary depending on connection type.) - if ((hello_.has_ledgerclosed ()) && ( - hello_.ledgerclosed ().size () == (256 / 8))) + if (hello_.has_ledgerclosed() && + stringIsUint256Sized (hello_.ledgerclosed())) { - memcpy (closedLedgerHash_.begin (), - hello_.ledgerclosed ().data (), 256 / 8); - if ((hello_.has_ledgerprevious ()) && - (hello_.ledgerprevious ().size () == (256 / 8))) + // Operations on closedLedgerHash_ and previousLedgerHash_ must be + // guarded by recentLock_. + std::lock_guard sl(recentLock_); + + closedLedgerHash_ = hello_.ledgerclosed(); + + if (hello_.has_ledgerprevious() && + stringIsUint256Sized (hello_.ledgerprevious())) { - memcpy (previousLedgerHash_.begin (), - hello_.ledgerprevious ().data (), 256 / 8); - addLedger (previousLedgerHash_); + previousLedgerHash_ = hello_.ledgerprevious(); + addLedger (previousLedgerHash_, sl); } else { @@ -730,6 +781,13 @@ PeerImp::onWriteResponse (error_code ec, std::size_t bytes_transferred) std::placeholders::_2))); } +std::string +PeerImp::getName() const +{ + std::shared_lock read_lock{nameMutex_}; + return name_; +} + //------------------------------------------------------------------------------ // Protocol logic @@ -922,6 +980,10 @@ PeerImp::onMessage (std::shared_ptr const& m) if (m->type () == protocol::TMPing::ptPONG) { + // Operations on lastPingSeq_, lastPingTime_, no_ping_, and latency_ + // must be guarded by recentLock_. + std::lock_guard sl(recentLock_); + if (m->has_seq() && m->seq() == lastPingSeq_) { no_ping_ = 0; @@ -932,8 +994,6 @@ PeerImp::onMessage (std::shared_ptr const& m) lastPingSeq_.reset(); // Update latency estimate - std::lock_guard sl(recentLock_); - auto const estimate = std::chrono::duration_cast (clock_type::now() - lastPingTime_); @@ -1000,7 +1060,7 @@ PeerImp::onMessage (std::shared_ptr const& m) if (item.address != beast::IP::Endpoint()) gossip.items.push_back(item); } - overlay_.resourceManager().importConsumers (name_, gossip); + overlay_.resourceManager().importConsumers (getName(), gossip); } // Calculate the cluster fee: @@ -1518,16 +1578,14 @@ PeerImp::onMessage (std::shared_ptr const& m) return; } - uint256 hash; - - if (m->ledgerhash ().size () != 32) + if (! stringIsUint256Sized (m->ledgerhash())) { JLOG(p_journal_.warn()) << "TX candidate reply with invalid hash size"; fee_ = Resource::feeInvalidRequest; return; } - memcpy (hash.begin (), m->ledgerhash ().data (), 32); + uint256 const hash {m->ledgerhash()}; if (m->type () == protocol::liTS_CANDIDATE) { @@ -1571,7 +1629,8 @@ PeerImp::onMessage (std::shared_ptr const& m) return; } - if (set.currenttxhash().size() != 32 || set.previousledger().size() != 32) + if (! stringIsUint256Sized (set.currenttxhash()) || + ! stringIsUint256Sized (set.previousledger())) { JLOG(p_journal_.warn()) << "Proposal: malformed"; fee_ = Resource::feeInvalidRequest; @@ -1643,49 +1702,81 @@ PeerImp::onMessage (std::shared_ptr const& m) if (!m->has_networktime ()) m->set_networktime (app_.timeKeeper().now().time_since_epoch().count()); - if (!last_status_.has_newstatus () || m->has_newstatus ()) - last_status_ = *m; - else { - // preserve old status - protocol::NodeStatus status = last_status_.newstatus (); - last_status_ = *m; - m->set_newstatus (status); + std::lock_guard sl(recentLock_); + if (!last_status_.has_newstatus () || m->has_newstatus ()) + last_status_ = *m; + else + { + // preserve old status + protocol::NodeStatus status = last_status_.newstatus (); + last_status_ = *m; + m->set_newstatus (status); + } } if (m->newevent () == protocol::neLOST_SYNC) { - if (!closedLedgerHash_.isZero ()) + bool outOfSync {false}; + { + // Operations on closedLedgerHash_ and previousLedgerHash_ must be + // guarded by recentLock_. + std::lock_guard sl(recentLock_); + if (!closedLedgerHash_.isZero ()) + { + outOfSync = true; + closedLedgerHash_.zero (); + } + previousLedgerHash_.zero (); + } + if (outOfSync) { JLOG(p_journal_.debug()) << "Status: Out of sync"; - closedLedgerHash_.zero (); } - - previousLedgerHash_.zero (); return; } - if (m->has_ledgerhash () && (m->ledgerhash ().size () == (256 / 8))) { - // a peer has changed ledgers - memcpy (closedLedgerHash_.begin (), m->ledgerhash ().data (), 256 / 8); - addLedger (closedLedgerHash_); - JLOG(p_journal_.debug()) << "LCL is " << closedLedgerHash_; - } - else - { - JLOG(p_journal_.debug()) << "Status: No ledger"; - closedLedgerHash_.zero (); + uint256 closedLedgerHash {}; + bool const peerChangedLedgers { + m->has_ledgerhash() && stringIsUint256Sized (m->ledgerhash())}; + + { + // Operations on closedLedgerHash_ and previousLedgerHash_ must be + // guarded by recentLock_. + std::lock_guard sl(recentLock_); + if (peerChangedLedgers) + { + closedLedgerHash_ = m->ledgerhash(); + closedLedgerHash = closedLedgerHash_; + addLedger (closedLedgerHash, sl); + } + else + { + closedLedgerHash_.zero(); + } + + if (m->has_ledgerhashprevious() && + stringIsUint256Sized (m->ledgerhashprevious())) + { + previousLedgerHash_ = m->ledgerhashprevious(); + addLedger (previousLedgerHash_, sl); + } + else + { + previousLedgerHash_.zero (); + } + } + if (peerChangedLedgers) + { + JLOG(p_journal_.debug()) << "LCL is " << closedLedgerHash; + } + else + { + JLOG(p_journal_.debug()) << "Status: No ledger"; + } } - if (m->has_ledgerhashprevious () && - m->ledgerhashprevious ().size () == (256 / 8)) - { - memcpy (previousLedgerHash_.begin (), - m->ledgerhashprevious ().data (), 256 / 8); - addLedger (previousLedgerHash_); - } - else previousLedgerHash_.zero (); if (m->has_firstseq () && m->has_lastseq()) { @@ -1757,7 +1848,12 @@ PeerImp::onMessage (std::shared_ptr const& m) if (m->has_ledgerhash ()) { - j[jss::ledger_hash] = to_string (closedLedgerHash_); + uint256 closedLedgerHash {}; + { + std::lock_guard sl(recentLock_); + closedLedgerHash = closedLedgerHash_; + } + j[jss::ledger_hash] = to_string (closedLedgerHash); } if (m->has_networktime ()) @@ -1856,20 +1952,13 @@ void PeerImp::check () void PeerImp::onMessage (std::shared_ptr const& m) { - uint256 hashes; - - if (m->hash ().size () != (256 / 8)) + if (! stringIsUint256Sized (m->hash())) { fee_ = Resource::feeInvalidRequest; return; } - uint256 hash; - - // VFALCO TODO There should be no use of memcpy() throughout the program. - // TODO Clean up this magic number - // - memcpy (hash.begin (), m->hash ().data (), 32); + uint256 const hash {m->hash()}; if (m->status () == protocol::tsHAVE) { @@ -2009,14 +2098,13 @@ PeerImp::onMessage (std::shared_ptr const& m) for (int i = 0; i < packet.objects_size (); ++i) { auto const& obj = packet.objects (i); - if (obj.has_hash () && (obj.hash ().size () == (256 / 8))) + if (obj.has_hash() && stringIsUint256Sized (obj.hash())) { - uint256 hash; - memcpy (hash.begin (), obj.hash ().data (), 256 / 8); + uint256 const hash {obj.hash()}; // VFALCO TODO Move this someplace more sensible so we dont // need to inject the NodeStore interfaces. std::uint32_t seq {obj.has_ledgerseq() ? obj.ledgerseq() : 0}; - auto hObj {app_.getNodeStore ().fetch (hash, seq)}; + auto hObj {app_.getNodeStore().fetch (hash, seq)}; if (!hObj) { if (auto shardStore = app_.getShardStore()) @@ -2054,23 +2142,23 @@ PeerImp::onMessage (std::shared_ptr const& m) bool pLDo = true; bool progress = false; - for (int i = 0; i < packet.objects_size (); ++i) + for (int i = 0; i < packet.objects_size(); ++i) { const protocol::TMIndexedObject& obj = packet.objects (i); - if (obj.has_hash () && (obj.hash ().size () == (256 / 8))) + if (obj.has_hash() && stringIsUint256Sized (obj.hash())) { - if (obj.has_ledgerseq ()) + if (obj.has_ledgerseq()) { - if (obj.ledgerseq () != pLSeq) + if (obj.ledgerseq() != pLSeq) { if (pLDo && (pLSeq != 0)) { JLOG(p_journal_.debug()) << "GetObj: Full fetch pack for " << pLSeq; } - pLSeq = obj.ledgerseq (); - pLDo = !app_.getLedgerMaster ().haveLedger (pLSeq); + pLSeq = obj.ledgerseq(); + pLDo = !app_.getLedgerMaster().haveLedger (pLSeq); if (!pLDo) { @@ -2084,14 +2172,13 @@ PeerImp::onMessage (std::shared_ptr const& m) if (pLDo) { - uint256 hash; - memcpy (hash.begin (), obj.hash ().data (), 256 / 8); + uint256 const hash {obj.hash()}; std::shared_ptr< Blob > data ( std::make_shared< Blob > ( - obj.data ().begin (), obj.data ().end ())); + obj.data().begin(), obj.data().end())); - app_.getLedgerMaster ().addFetchPack (hash, data); + app_.getLedgerMaster().addFetchPack (hash, data); } } } @@ -2109,9 +2196,12 @@ PeerImp::onMessage (std::shared_ptr const& m) //-------------------------------------------------------------------------- void -PeerImp::addLedger (uint256 const& hash) +PeerImp::addLedger (uint256 const& hash, + std::lock_guard const& lockedRecentLock) { - std::lock_guard sl(recentLock_); + // lockedRecentLock is passed as a reminder that recentLock_ must be + // locked by the caller. + (void) lockedRecentLock; if (std::find (recentLedgers_.begin(), recentLedgers_.end(), hash) != recentLedgers_.end()) @@ -2139,7 +2229,7 @@ PeerImp::doFetchPack (const std::shared_ptr& packet return; } - if (packet->ledgerhash ().size () != 32) + if (! stringIsUint256Sized (packet->ledgerhash())) { JLOG(p_journal_.warn()) << "FetchPack hash size malformed"; fee_ = Resource::feeInvalidRequest; @@ -2148,8 +2238,7 @@ PeerImp::doFetchPack (const std::shared_ptr& packet fee_ = Resource::feeHighBurdenPeer; - uint256 hash; - memcpy (hash.begin (), packet->ledgerhash ().data (), 32); + uint256 const hash {packet->ledgerhash()}; std::weak_ptr weak = shared_from_this(); auto elapsed = UptimeClock::now(); @@ -2385,15 +2474,15 @@ PeerImp::getLedger (std::shared_ptr const& m) // Request is for a transaction candidate set JLOG(p_journal_.trace()) << "GetLedger: Tx candidate set"; - if ((!packet.has_ledgerhash () || packet.ledgerhash ().size () != 32)) + if (!packet.has_ledgerhash() || + !stringIsUint256Sized (packet.ledgerhash())) { charge (Resource::feeInvalidRequest); JLOG(p_journal_.warn()) << "GetLedger: Tx candidate set invalid"; return; } - uint256 txHash; - memcpy (txHash.begin (), packet.ledgerhash ().data (), 32); + uint256 const txHash {packet.ledgerhash()}; shared = app_.getInboundTransactions().getSet (txHash, false); map = shared.get(); @@ -2447,16 +2536,14 @@ PeerImp::getLedger (std::shared_ptr const& m) if (packet.has_ledgerhash ()) { - uint256 ledgerhash; - - if (packet.ledgerhash ().size () != 32) + if (! stringIsUint256Sized (packet.ledgerhash())) { charge (Resource::feeInvalidRequest); JLOG(p_journal_.warn()) << "GetLedger: Invalid request"; return; } - memcpy (ledgerhash.begin (), packet.ledgerhash ().data (), 32); + uint256 const ledgerhash {packet.ledgerhash()}; logMe += "LedgerHash:"; logMe += to_string (ledgerhash); ledger = app_.getLedgerMaster ().getLedgerByHash (ledgerhash); diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 69a5a64fb7..6830500f78 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -38,6 +38,7 @@ #include #include #include +#include namespace ripple { @@ -48,7 +49,7 @@ class PeerImp { public: /** Type of connection. - The affects how messages are routed. + This affects how messages are routed. */ enum class Type { @@ -121,15 +122,15 @@ private: // These are up here to prevent warnings about order of initializations // OverlayImpl& overlay_; - bool m_inbound; + bool const m_inbound; State state_; // Current state std::atomic sanity_; clock_type::time_point insaneTime_; bool detaching_ = false; // Node public key of peer. - PublicKey publicKey_; + PublicKey const publicKey_; std::string name_; - uint256 sharedValue_; + std::shared_timed_mutex mutable nameMutex_; // The indices of the smallest and largest ledgers this peer has available // @@ -143,14 +144,44 @@ private: boost::optional latency_; boost::optional lastPingSeq_; clock_type::time_point lastPingTime_; - clock_type::time_point creationTime_; + clock_type::time_point const creationTime_; + + // Notes on thread locking: + // + // During an audit it was noted that some member variables that looked + // like they need thread protection were not receiving it. And, indeed, + // that was correct. But the multi-phase initialization of PeerImp + // makes such an audit difficult. A further audit suggests that the + // locking is now protecting variables that don't need it. We're + // leaving that locking in place (for now) as a form of future proofing. + // + // Here are the variables that appear to need locking currently: + // + // o closedLedgerHash_ + // o previousLedgerHash_ + // o minLedger_ + // o maxLedger_ + // o recentLedgers_ + // o recentTxSets_ + // o insaneTime_ + // o latency_ + // + // The following variables are being protected preemptively: + // + // o name_ + // o last_status_ + // o lastPingSeq_ + // o lastPingTime_ + // o no_ping_ + // + // June 2019 std::mutex mutable recentLock_; protocol::TMStatusChange last_status_; - protocol::TMHello hello_; + protocol::TMHello const hello_; Resource::Consumer usage_; Resource::Charge fee_; - PeerFinder::Slot::ptr slot_; + PeerFinder::Slot::ptr const slot_; boost::beast::multi_buffer read_buffer_; http_request_type request_; http_response_type response_; @@ -375,6 +406,10 @@ private: void onWriteResponse (error_code ec, std::size_t bytes_transferred); + // A thread-safe way of getting the name. + std::string + getName() const; + // // protocol message loop // @@ -451,8 +486,11 @@ private: //-------------------------------------------------------------------------- + // lockedRecentLock is passed as a reminder to callers that recentLock_ + // must be locked. void - addLedger (uint256 const& hash); + addLedger (uint256 const& hash, + std::lock_guard const& lockedRecentLock); void doFetchPack (const std::shared_ptr& packet);