Improve locking of PeerImp member variables

This commit is contained in:
Scott Schurr
2019-04-29 11:20:09 -07:00
committed by Manoj doshi
parent f8365f5009
commit dc24748c24
2 changed files with 263 additions and 138 deletions

View File

@@ -88,14 +88,25 @@ PeerImp::PeerImp (Application& app, id_t id, endpoint_type remote_endpoint,
PeerImp::~PeerImp ()
{
if (cluster())
{
JLOG(journal_.warn()) << name_ << " left cluster";
}
const bool inCluster {cluster()};
if (state_ == State::active)
overlay_.onPeerDeactivate(id_);
overlay_.peerFinder().on_closed (slot_);
overlay_.remove (slot_);
if (inCluster)
{
JLOG(journal_.warn()) << getName() << " left cluster";
}
}
// Helper function to check for valid uint256 values in protobuf buffers
static
bool
stringIsUint256Sized (std::string const& pBuffStr)
{
return pBuffStr.size() == uint256::size();
}
void
@@ -113,17 +124,21 @@ PeerImp::run()
assert (state_ == State::active);
// XXX Set timer: connection is in grace period to be useful.
// XXX Set timer: connection idle (idle may vary depending on connection type.)
if ((hello_.has_ledgerclosed ()) && (
hello_.ledgerclosed ().size () == (256 / 8)))
if (hello_.has_ledgerclosed() &&
stringIsUint256Sized (hello_.ledgerclosed()))
{
memcpy (closedLedgerHash_.begin (),
hello_.ledgerclosed ().data (), 256 / 8);
if ((hello_.has_ledgerprevious ()) &&
(hello_.ledgerprevious ().size () == (256 / 8)))
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
// guarded by recentLock_.
std::lock_guard<std::mutex> sl(recentLock_);
closedLedgerHash_ = hello_.ledgerclosed();
if (hello_.has_ledgerprevious() &&
stringIsUint256Sized (hello_.ledgerprevious()))
{
memcpy (previousLedgerHash_.begin (),
hello_.ledgerprevious ().data (), 256 / 8);
addLedger (previousLedgerHash_);
previousLedgerHash_ = hello_.ledgerprevious();
addLedger (previousLedgerHash_, sl);
}
else
{
@@ -190,10 +205,12 @@ PeerImp::send (Message::pointer const& m)
// a small senq periodically
large_sendq_ = 0;
}
else if ((sendq_size % Tuning::sendQueueLogFreq) == 0)
else if (journal_.active (beast::severities::kDebug) &&
(sendq_size % Tuning::sendQueueLogFreq) == 0)
{
std::string const name {getName()};
JLOG (journal_.debug()) <<
(name_.empty() ? remote_address_.to_string() : name_) <<
(name.empty() ? remote_address_.to_string() : name) <<
" sendq: " << sendq_size;
}
@@ -262,8 +279,9 @@ PeerImp::json()
{
ret[jss::cluster] = true;
if (!name_.empty ())
ret[jss::name] = name_;
std::string name {getName()};
if (!name.empty ())
ret[jss::name] = std::move(name);
}
ret[jss::load] = usage_.balance ();
@@ -295,9 +313,6 @@ PeerImp::json()
ret[jss::complete_ledgers] = std::to_string(minSeq) +
" - " + std::to_string(maxSeq);
if (closedLedgerHash_ != beast::zero)
ret[jss::ledger] = to_string (closedLedgerHash_);
switch (sanity_.load ())
{
case Sanity::insane:
@@ -313,9 +328,20 @@ PeerImp::json()
break;
}
if (last_status_.has_newstatus ())
uint256 closedLedgerHash;
protocol::TMStatusChange last_status;
{
switch (last_status_.newstatus ())
std::lock_guard<std::mutex> sl(recentLock_);
closedLedgerHash = closedLedgerHash_;
last_status = last_status_;
}
if (closedLedgerHash != beast::zero)
ret[jss::ledger] = to_string (closedLedgerHash);
if (last_status.has_newstatus ())
{
switch (last_status.newstatus ())
{
case protocol::nsCONNECTING:
ret[jss::status] = "connecting";
@@ -338,9 +364,8 @@ PeerImp::json()
break;
default:
// FIXME: do we really want this?
JLOG(p_journal_.warn()) <<
"Unknown status: " << last_status_.newstatus ();
"Unknown status: " << last_status.newstatus ();
}
}
@@ -397,6 +422,9 @@ PeerImp::hasTxSet (uint256 const& hash) const
void
PeerImp::cycleStatus ()
{
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
// guarded by recentLock_.
std::lock_guard<std::mutex> sl(recentLock_);
previousLedgerHash_ = closedLedgerHash_;
closedLedgerHash_.zero ();
}
@@ -410,7 +438,10 @@ PeerImp::supportsVersion (int version)
bool
PeerImp::hasRange (std::uint32_t uMin, std::uint32_t uMax)
{
return (sanity_ != Sanity::insane) && (uMin >= minLedger_) && (uMax <= maxLedger_);
std::lock_guard<std::mutex> sl(recentLock_);
return (sanity_ != Sanity::insane) &&
(uMin >= minLedger_) &&
(uMax <= maxLedger_);
}
//------------------------------------------------------------------------------
@@ -447,10 +478,11 @@ PeerImp::fail(std::string const& reason)
(void (Peer::*)(std::string const&)) & PeerImp::fail,
shared_from_this(),
reason));
if (socket_.is_open())
if (journal_.active (beast::severities::kWarning) && socket_.is_open())
{
std::string const name {getName()};
JLOG (journal_.warn()) <<
(name_.empty() ? remote_address_.to_string() : name_) <<
(name.empty() ? remote_address_.to_string() : name) <<
" failed: " << reason;
}
close();
@@ -568,36 +600,49 @@ PeerImp::onTimer (error_code const& ec)
return;
}
if (no_ping_++ >= Tuning::noPing)
bool failedNoPing {false};
boost::optional<std::uint32_t> pingSeq;
// Operations on lastPingSeq_, lastPingTime_, no_ping_, and latency_
// must be guarded by recentLock_.
{
std::lock_guard<std::mutex> sl(recentLock_);
if (no_ping_++ >= Tuning::noPing)
{
failedNoPing = true;
}
else if (!lastPingSeq_)
{
// Make the sequence unpredictable enough to prevent guessing
lastPingSeq_ = rand_int<std::uint32_t>();
lastPingTime_ = clock_type::now();
pingSeq = lastPingSeq_;
}
else
{
// We have an outstanding ping, raise latency
auto const minLatency =
std::chrono::duration_cast<std::chrono::milliseconds>
(clock_type::now() - lastPingTime_);
if (latency_ < minLatency)
latency_ = minLatency;
}
}
if (failedNoPing)
{
fail ("No ping reply received");
return;
}
if (!lastPingSeq_)
if (pingSeq)
{
// Make the sequence unpredictable enough to prevent guessing
lastPingSeq_ = rand_int<std::uint32_t>();
lastPingTime_ = clock_type::now();
protocol::TMPing message;
message.set_type (protocol::TMPing::ptPING);
message.set_seq (*lastPingSeq_);
message.set_seq (*pingSeq);
send (std::make_shared<Message> (message, protocol::mtPING));
}
else
{
// We have an outstanding ping, raise latency
auto const minLatency =
std::chrono::duration_cast<std::chrono::milliseconds>
(clock_type::now() - lastPingTime_);
std::lock_guard<std::mutex> sl(recentLock_);
if (latency_ < minLatency)
latency_ = minLatency;
}
setTimer();
}
@@ -647,25 +692,31 @@ void PeerImp::doAccept()
publicKey_);
if (auto member = app_.cluster().member(publicKey_))
{
name_ = *member;
JLOG(journal_.info()) << "Cluster name: " << name_;
{
std::unique_lock<std::shared_timed_mutex> lock{nameMutex_};
name_ = *member;
}
JLOG(journal_.info()) << "Cluster name: " << *member;
}
overlay_.activate(shared_from_this());
// XXX Set timer: connection is in grace period to be useful.
// XXX Set timer: connection idle (idle may vary depending on connection type.)
if ((hello_.has_ledgerclosed ()) && (
hello_.ledgerclosed ().size () == (256 / 8)))
if (hello_.has_ledgerclosed() &&
stringIsUint256Sized (hello_.ledgerclosed()))
{
memcpy (closedLedgerHash_.begin (),
hello_.ledgerclosed ().data (), 256 / 8);
if ((hello_.has_ledgerprevious ()) &&
(hello_.ledgerprevious ().size () == (256 / 8)))
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
// guarded by recentLock_.
std::lock_guard<std::mutex> sl(recentLock_);
closedLedgerHash_ = hello_.ledgerclosed();
if (hello_.has_ledgerprevious() &&
stringIsUint256Sized (hello_.ledgerprevious()))
{
memcpy (previousLedgerHash_.begin (),
hello_.ledgerprevious ().data (), 256 / 8);
addLedger (previousLedgerHash_);
previousLedgerHash_ = hello_.ledgerprevious();
addLedger (previousLedgerHash_, sl);
}
else
{
@@ -730,6 +781,13 @@ PeerImp::onWriteResponse (error_code ec, std::size_t bytes_transferred)
std::placeholders::_2)));
}
std::string
PeerImp::getName() const
{
std::shared_lock<std::shared_timed_mutex> read_lock{nameMutex_};
return name_;
}
//------------------------------------------------------------------------------
// Protocol logic
@@ -922,6 +980,10 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMPing> const& m)
if (m->type () == protocol::TMPing::ptPONG)
{
// Operations on lastPingSeq_, lastPingTime_, no_ping_, and latency_
// must be guarded by recentLock_.
std::lock_guard<std::mutex> sl(recentLock_);
if (m->has_seq() && m->seq() == lastPingSeq_)
{
no_ping_ = 0;
@@ -932,8 +994,6 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMPing> const& m)
lastPingSeq_.reset();
// Update latency estimate
std::lock_guard<std::mutex> sl(recentLock_);
auto const estimate =
std::chrono::duration_cast<std::chrono::milliseconds>
(clock_type::now() - lastPingTime_);
@@ -1000,7 +1060,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMCluster> const& m)
if (item.address != beast::IP::Endpoint())
gossip.items.push_back(item);
}
overlay_.resourceManager().importConsumers (name_, gossip);
overlay_.resourceManager().importConsumers (getName(), gossip);
}
// Calculate the cluster fee:
@@ -1518,16 +1578,14 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMLedgerData> const& m)
return;
}
uint256 hash;
if (m->ledgerhash ().size () != 32)
if (! stringIsUint256Sized (m->ledgerhash()))
{
JLOG(p_journal_.warn()) << "TX candidate reply with invalid hash size";
fee_ = Resource::feeInvalidRequest;
return;
}
memcpy (hash.begin (), m->ledgerhash ().data (), 32);
uint256 const hash {m->ledgerhash()};
if (m->type () == protocol::liTS_CANDIDATE)
{
@@ -1571,7 +1629,8 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMProposeSet> const& m)
return;
}
if (set.currenttxhash().size() != 32 || set.previousledger().size() != 32)
if (! stringIsUint256Sized (set.currenttxhash()) ||
! stringIsUint256Sized (set.previousledger()))
{
JLOG(p_journal_.warn()) << "Proposal: malformed";
fee_ = Resource::feeInvalidRequest;
@@ -1643,49 +1702,81 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMStatusChange> const& m)
if (!m->has_networktime ())
m->set_networktime (app_.timeKeeper().now().time_since_epoch().count());
if (!last_status_.has_newstatus () || m->has_newstatus ())
last_status_ = *m;
else
{
// preserve old status
protocol::NodeStatus status = last_status_.newstatus ();
last_status_ = *m;
m->set_newstatus (status);
std::lock_guard<std::mutex> sl(recentLock_);
if (!last_status_.has_newstatus () || m->has_newstatus ())
last_status_ = *m;
else
{
// preserve old status
protocol::NodeStatus status = last_status_.newstatus ();
last_status_ = *m;
m->set_newstatus (status);
}
}
if (m->newevent () == protocol::neLOST_SYNC)
{
if (!closedLedgerHash_.isZero ())
bool outOfSync {false};
{
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
// guarded by recentLock_.
std::lock_guard<std::mutex> sl(recentLock_);
if (!closedLedgerHash_.isZero ())
{
outOfSync = true;
closedLedgerHash_.zero ();
}
previousLedgerHash_.zero ();
}
if (outOfSync)
{
JLOG(p_journal_.debug()) << "Status: Out of sync";
closedLedgerHash_.zero ();
}
previousLedgerHash_.zero ();
return;
}
if (m->has_ledgerhash () && (m->ledgerhash ().size () == (256 / 8)))
{
// a peer has changed ledgers
memcpy (closedLedgerHash_.begin (), m->ledgerhash ().data (), 256 / 8);
addLedger (closedLedgerHash_);
JLOG(p_journal_.debug()) << "LCL is " << closedLedgerHash_;
}
else
{
JLOG(p_journal_.debug()) << "Status: No ledger";
closedLedgerHash_.zero ();
uint256 closedLedgerHash {};
bool const peerChangedLedgers {
m->has_ledgerhash() && stringIsUint256Sized (m->ledgerhash())};
{
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
// guarded by recentLock_.
std::lock_guard<std::mutex> sl(recentLock_);
if (peerChangedLedgers)
{
closedLedgerHash_ = m->ledgerhash();
closedLedgerHash = closedLedgerHash_;
addLedger (closedLedgerHash, sl);
}
else
{
closedLedgerHash_.zero();
}
if (m->has_ledgerhashprevious() &&
stringIsUint256Sized (m->ledgerhashprevious()))
{
previousLedgerHash_ = m->ledgerhashprevious();
addLedger (previousLedgerHash_, sl);
}
else
{
previousLedgerHash_.zero ();
}
}
if (peerChangedLedgers)
{
JLOG(p_journal_.debug()) << "LCL is " << closedLedgerHash;
}
else
{
JLOG(p_journal_.debug()) << "Status: No ledger";
}
}
if (m->has_ledgerhashprevious () &&
m->ledgerhashprevious ().size () == (256 / 8))
{
memcpy (previousLedgerHash_.begin (),
m->ledgerhashprevious ().data (), 256 / 8);
addLedger (previousLedgerHash_);
}
else previousLedgerHash_.zero ();
if (m->has_firstseq () && m->has_lastseq())
{
@@ -1757,7 +1848,12 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMStatusChange> const& m)
if (m->has_ledgerhash ())
{
j[jss::ledger_hash] = to_string (closedLedgerHash_);
uint256 closedLedgerHash {};
{
std::lock_guard<std::mutex> sl(recentLock_);
closedLedgerHash = closedLedgerHash_;
}
j[jss::ledger_hash] = to_string (closedLedgerHash);
}
if (m->has_networktime ())
@@ -1856,20 +1952,13 @@ void PeerImp::check ()
void
PeerImp::onMessage (std::shared_ptr <protocol::TMHaveTransactionSet> const& m)
{
uint256 hashes;
if (m->hash ().size () != (256 / 8))
if (! stringIsUint256Sized (m->hash()))
{
fee_ = Resource::feeInvalidRequest;
return;
}
uint256 hash;
// VFALCO TODO There should be no use of memcpy() throughout the program.
// TODO Clean up this magic number
//
memcpy (hash.begin (), m->hash ().data (), 32);
uint256 const hash {m->hash()};
if (m->status () == protocol::tsHAVE)
{
@@ -2009,14 +2098,13 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetObjectByHash> const& m)
for (int i = 0; i < packet.objects_size (); ++i)
{
auto const& obj = packet.objects (i);
if (obj.has_hash () && (obj.hash ().size () == (256 / 8)))
if (obj.has_hash() && stringIsUint256Sized (obj.hash()))
{
uint256 hash;
memcpy (hash.begin (), obj.hash ().data (), 256 / 8);
uint256 const hash {obj.hash()};
// VFALCO TODO Move this someplace more sensible so we dont
// need to inject the NodeStore interfaces.
std::uint32_t seq {obj.has_ledgerseq() ? obj.ledgerseq() : 0};
auto hObj {app_.getNodeStore ().fetch (hash, seq)};
auto hObj {app_.getNodeStore().fetch (hash, seq)};
if (!hObj)
{
if (auto shardStore = app_.getShardStore())
@@ -2054,23 +2142,23 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetObjectByHash> const& m)
bool pLDo = true;
bool progress = false;
for (int i = 0; i < packet.objects_size (); ++i)
for (int i = 0; i < packet.objects_size(); ++i)
{
const protocol::TMIndexedObject& obj = packet.objects (i);
if (obj.has_hash () && (obj.hash ().size () == (256 / 8)))
if (obj.has_hash() && stringIsUint256Sized (obj.hash()))
{
if (obj.has_ledgerseq ())
if (obj.has_ledgerseq())
{
if (obj.ledgerseq () != pLSeq)
if (obj.ledgerseq() != pLSeq)
{
if (pLDo && (pLSeq != 0))
{
JLOG(p_journal_.debug()) <<
"GetObj: Full fetch pack for " << pLSeq;
}
pLSeq = obj.ledgerseq ();
pLDo = !app_.getLedgerMaster ().haveLedger (pLSeq);
pLSeq = obj.ledgerseq();
pLDo = !app_.getLedgerMaster().haveLedger (pLSeq);
if (!pLDo)
{
@@ -2084,14 +2172,13 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetObjectByHash> const& m)
if (pLDo)
{
uint256 hash;
memcpy (hash.begin (), obj.hash ().data (), 256 / 8);
uint256 const hash {obj.hash()};
std::shared_ptr< Blob > data (
std::make_shared< Blob > (
obj.data ().begin (), obj.data ().end ()));
obj.data().begin(), obj.data().end()));
app_.getLedgerMaster ().addFetchPack (hash, data);
app_.getLedgerMaster().addFetchPack (hash, data);
}
}
}
@@ -2109,9 +2196,12 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetObjectByHash> const& m)
//--------------------------------------------------------------------------
void
PeerImp::addLedger (uint256 const& hash)
PeerImp::addLedger (uint256 const& hash,
std::lock_guard<std::mutex> const& lockedRecentLock)
{
std::lock_guard<std::mutex> sl(recentLock_);
// lockedRecentLock is passed as a reminder that recentLock_ must be
// locked by the caller.
(void) lockedRecentLock;
if (std::find (recentLedgers_.begin(),
recentLedgers_.end(), hash) != recentLedgers_.end())
@@ -2139,7 +2229,7 @@ PeerImp::doFetchPack (const std::shared_ptr<protocol::TMGetObjectByHash>& packet
return;
}
if (packet->ledgerhash ().size () != 32)
if (! stringIsUint256Sized (packet->ledgerhash()))
{
JLOG(p_journal_.warn()) << "FetchPack hash size malformed";
fee_ = Resource::feeInvalidRequest;
@@ -2148,8 +2238,7 @@ PeerImp::doFetchPack (const std::shared_ptr<protocol::TMGetObjectByHash>& packet
fee_ = Resource::feeHighBurdenPeer;
uint256 hash;
memcpy (hash.begin (), packet->ledgerhash ().data (), 32);
uint256 const hash {packet->ledgerhash()};
std::weak_ptr<PeerImp> weak = shared_from_this();
auto elapsed = UptimeClock::now();
@@ -2385,15 +2474,15 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
// Request is for a transaction candidate set
JLOG(p_journal_.trace()) << "GetLedger: Tx candidate set";
if ((!packet.has_ledgerhash () || packet.ledgerhash ().size () != 32))
if (!packet.has_ledgerhash() ||
!stringIsUint256Sized (packet.ledgerhash()))
{
charge (Resource::feeInvalidRequest);
JLOG(p_journal_.warn()) << "GetLedger: Tx candidate set invalid";
return;
}
uint256 txHash;
memcpy (txHash.begin (), packet.ledgerhash ().data (), 32);
uint256 const txHash {packet.ledgerhash()};
shared = app_.getInboundTransactions().getSet (txHash, false);
map = shared.get();
@@ -2447,16 +2536,14 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
if (packet.has_ledgerhash ())
{
uint256 ledgerhash;
if (packet.ledgerhash ().size () != 32)
if (! stringIsUint256Sized (packet.ledgerhash()))
{
charge (Resource::feeInvalidRequest);
JLOG(p_journal_.warn()) << "GetLedger: Invalid request";
return;
}
memcpy (ledgerhash.begin (), packet.ledgerhash ().data (), 32);
uint256 const ledgerhash {packet.ledgerhash()};
logMe += "LedgerHash:";
logMe += to_string (ledgerhash);
ledger = app_.getLedgerMaster ().getLedgerByHash (ledgerhash);

View File

@@ -38,6 +38,7 @@
#include <cstdint>
#include <deque>
#include <queue>
#include <shared_mutex>
namespace ripple {
@@ -48,7 +49,7 @@ class PeerImp
{
public:
/** Type of connection.
The affects how messages are routed.
This affects how messages are routed.
*/
enum class Type
{
@@ -121,15 +122,15 @@ private:
// These are up here to prevent warnings about order of initializations
//
OverlayImpl& overlay_;
bool m_inbound;
bool const m_inbound;
State state_; // Current state
std::atomic<Sanity> sanity_;
clock_type::time_point insaneTime_;
bool detaching_ = false;
// Node public key of peer.
PublicKey publicKey_;
PublicKey const publicKey_;
std::string name_;
uint256 sharedValue_;
std::shared_timed_mutex mutable nameMutex_;
// The indices of the smallest and largest ledgers this peer has available
//
@@ -143,14 +144,44 @@ private:
boost::optional<std::chrono::milliseconds> latency_;
boost::optional<std::uint32_t> lastPingSeq_;
clock_type::time_point lastPingTime_;
clock_type::time_point creationTime_;
clock_type::time_point const creationTime_;
// Notes on thread locking:
//
// During an audit it was noted that some member variables that looked
// like they need thread protection were not receiving it. And, indeed,
// that was correct. But the multi-phase initialization of PeerImp
// makes such an audit difficult. A further audit suggests that the
// locking is now protecting variables that don't need it. We're
// leaving that locking in place (for now) as a form of future proofing.
//
// Here are the variables that appear to need locking currently:
//
// o closedLedgerHash_
// o previousLedgerHash_
// o minLedger_
// o maxLedger_
// o recentLedgers_
// o recentTxSets_
// o insaneTime_
// o latency_
//
// The following variables are being protected preemptively:
//
// o name_
// o last_status_
// o lastPingSeq_
// o lastPingTime_
// o no_ping_
//
// June 2019
std::mutex mutable recentLock_;
protocol::TMStatusChange last_status_;
protocol::TMHello hello_;
protocol::TMHello const hello_;
Resource::Consumer usage_;
Resource::Charge fee_;
PeerFinder::Slot::ptr slot_;
PeerFinder::Slot::ptr const slot_;
boost::beast::multi_buffer read_buffer_;
http_request_type request_;
http_response_type response_;
@@ -375,6 +406,10 @@ private:
void
onWriteResponse (error_code ec, std::size_t bytes_transferred);
// A thread-safe way of getting the name.
std::string
getName() const;
//
// protocol message loop
//
@@ -451,8 +486,11 @@ private:
//--------------------------------------------------------------------------
// lockedRecentLock is passed as a reminder to callers that recentLock_
// must be locked.
void
addLedger (uint256 const& hash);
addLedger (uint256 const& hash,
std::lock_guard<std::mutex> const& lockedRecentLock);
void
doFetchPack (const std::shared_ptr<protocol::TMGetObjectByHash>& packet);