Simplify PeerSet, InboundLedger and TransactionAcquire:

* Use std::mutex instead of std::recursive_mutex
* Remove unnecessary type alias
* Use std::set instead of ripple::hash_map
* Don't reinvent virtual functions
This commit is contained in:
Nik Bougalis
2016-05-31 00:05:25 -07:00
parent d1200224e2
commit 279c2a6f82
13 changed files with 106 additions and 129 deletions

View File

@@ -59,6 +59,9 @@ public:
~InboundLedger ();
// Called when the PeerSet timer expires
void execute () override;
// Called when another attempt is made to fetch this same ledger
void update (std::uint32_t seq);
@@ -97,7 +100,7 @@ private:
std::vector<std::pair<SHAMapNodeID, uint256>>& nodes,
TriggerReason reason);
void trigger (Peer::ptr const&, TriggerReason);
void trigger (std::shared_ptr<Peer> const&, TriggerReason);
std::vector<neededHash_t> getNeededHashes ();
@@ -106,9 +109,9 @@ private:
void done ();
void onTimer (bool progress, ScopedLockType& peerSetLock);
void onTimer (bool progress, ScopedLockType& peerSetLock) override;
void newPeer (Peer::ptr const& peer)
void newPeer (std::shared_ptr<Peer> const& peer) override
{
// For historical nodes, do not trigger too soon
// since a fetch pack is probably coming
@@ -116,7 +119,7 @@ private:
trigger (peer, TriggerReason::added);
}
std::weak_ptr <PeerSet> pmDowncast ();
std::weak_ptr <PeerSet> pmDowncast () override;
int processData (std::shared_ptr<Peer> peer, protocol::TMLedgerData& data);
@@ -163,7 +166,7 @@ private:
SHAMapAddNode mStats;
// Data we have received from peers
std::recursive_mutex mReceivedDataLock;
std::mutex mReceivedDataLock;
std::vector <PeerDataPairType> mReceivedData;
bool mReceiveDispatched;
};

View File

@@ -68,7 +68,7 @@ auto constexpr ledgerAcquireTimeout = 2500ms;
InboundLedger::InboundLedger (
Application& app, uint256 const& hash, std::uint32_t seq, fcReason reason, clock_type& clock)
: PeerSet (app, hash, ledgerAcquireTimeout, false, clock,
: PeerSet (app, hash, ledgerAcquireTimeout, clock,
app.journal("InboundLedger"))
, mHaveHeader (false)
, mHaveState (false)
@@ -112,6 +112,23 @@ void InboundLedger::init (ScopedLockType& collectionLock)
}
}
void InboundLedger::execute ()
{
if (app_.getJobQueue ().getJobCountTotal (jtLEDGER_DATA) > 4)
{
JLOG (m_journal.debug()) <<
"Deferring InboundLedger timer due to load";
setTimer ();
return;
}
app_.getJobQueue ().addJob (
jtLEDGER_DATA, "InboundLedger",
[ptr = shared_from_this()] (Job&)
{
ptr->invokeOnTimer ();
});
}
void InboundLedger::update (std::uint32_t seq)
{
ScopedLockType sl (mLock);
@@ -371,10 +388,10 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&)
// otherwise, we need to trigger before we add
// so each peer gets triggered once
if (mReason != fcHISTORY)
trigger (Peer::ptr (), TriggerReason::timeout);
trigger (nullptr, TriggerReason::timeout);
addPeers ();
if (mReason == fcHISTORY)
trigger (Peer::ptr (), TriggerReason::timeout);
trigger (nullptr, TriggerReason::timeout);
}
}
@@ -436,7 +453,7 @@ void InboundLedger::done ()
/** Request more nodes, perhaps from a specific peer
*/
void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason)
void InboundLedger::trigger (std::shared_ptr<Peer> const& peer, TriggerReason reason)
{
ScopedLockType sl (mLock);
@@ -518,9 +535,9 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason)
auto packet = std::make_shared <Message> (
tmBH, protocol::mtGET_OBJECTS);
for (auto const& it : mPeers)
for (auto id : mPeers)
{
if (auto p = app_.overlay ().findPeerByShortID (it.first))
if (auto p = app_.overlay ().findPeerByShortID (id))
{
mByHash = false;
p->send (packet);
@@ -1035,7 +1052,7 @@ InboundLedger::getNeededHashes ()
bool InboundLedger::gotData (std::weak_ptr<Peer> peer,
std::shared_ptr<protocol::TMLedgerData> data)
{
std::lock_guard<std::recursive_mutex> sl (mReceivedDataLock);
std::lock_guard<std::mutex> sl (mReceivedDataLock);
if (isDone ())
return false;
@@ -1177,11 +1194,12 @@ void InboundLedger::runData ()
int chosenPeerCount = -1;
std::vector <PeerDataPairType> data;
do
for (;;)
{
data.clear();
{
std::lock_guard<std::recursive_mutex> sl (mReceivedDataLock);
std::lock_guard<std::mutex> sl (mReceivedDataLock);
if (mReceivedData.empty ())
{
@@ -1206,8 +1224,7 @@ void InboundLedger::runData ()
}
}
}
} while (1);
}
if (chosenPeer)
trigger (chosenPeer, TriggerReason::reply);

View File

@@ -538,7 +538,7 @@ LedgerMaster::getFetchPack (LedgerHash missingHash, LedgerIndex missingIndex)
// Select target Peer based on highest score. The score is randomized
// but biased in favor of Peers with low latency.
Peer::ptr target;
std::shared_ptr<Peer> target;
{
int maxScore = 0;
auto peerList = app_.overlay ().getActivePeers();
@@ -1826,7 +1826,7 @@ LedgerMaster::makeFetchPack (
return;
}
Peer::ptr peer = wPeer.lock ();
auto peer = wPeer.lock ();
if (!peer)
return;

View File

@@ -42,7 +42,7 @@ enum
};
TransactionAcquire::TransactionAcquire (Application& app, uint256 const& hash, clock_type& clock)
: PeerSet (app, hash, TX_ACQUIRE_TIMEOUT, true, clock,
: PeerSet (app, hash, TX_ACQUIRE_TIMEOUT, clock,
app.journal("TransactionAcquire"))
, mHaveRoot (false)
, j_(app.journal("TransactionAcquire"))
@@ -56,6 +56,16 @@ TransactionAcquire::~TransactionAcquire ()
{
}
void TransactionAcquire::execute ()
{
app_.getJobQueue ().addJob (
jtTXN_DATA, "TransactionAcquire",
[ptr = shared_from_this()](Job&)
{
ptr->invokeOnTimer ();
});
}
void TransactionAcquire::done ()
{
// We hold a PeerSet lock and so cannot do real work here
@@ -99,7 +109,7 @@ void TransactionAcquire::onTimer (bool progress, ScopedLockType& psl)
}
if (aggressive)
trigger (Peer::ptr ());
trigger (nullptr);
addPeers (1);
}
@@ -109,7 +119,7 @@ std::weak_ptr<PeerSet> TransactionAcquire::pmDowncast ()
return std::dynamic_pointer_cast<PeerSet> (shared_from_this ());
}
void TransactionAcquire::trigger (Peer::ptr const& peer)
void TransactionAcquire::trigger (std::shared_ptr<Peer> const& peer)
{
if (mComplete)
{
@@ -173,7 +183,7 @@ void TransactionAcquire::trigger (Peer::ptr const& peer)
}
SHAMapAddNode TransactionAcquire::takeNodes (const std::list<SHAMapNodeID>& nodeIDs,
const std::list< Blob >& data, Peer::ptr const& peer)
const std::list< Blob >& data, std::shared_ptr<Peer> const& peer)
{
ScopedLockType sl (mLock);

View File

@@ -48,7 +48,7 @@ public:
}
SHAMapAddNode takeNodes (const std::list<SHAMapNodeID>& IDs,
const std::list< Blob >& data, Peer::ptr const&);
const std::list< Blob >& data, std::shared_ptr<Peer> const&);
void init (int startPeers);
@@ -60,10 +60,12 @@ private:
bool mHaveRoot;
beast::Journal j_;
void onTimer (bool progress, ScopedLockType& peerSetLock);
void execute () override;
void onTimer (bool progress, ScopedLockType& peerSetLock) override;
void newPeer (Peer::ptr const& peer)
void newPeer (std::shared_ptr<Peer> const& peer) override
{
trigger (peer);
}
@@ -73,8 +75,8 @@ private:
// Tries to add the specified number of peers
void addPeers (int num);
void trigger (Peer::ptr const&);
std::weak_ptr<PeerSet> pmDowncast ();
void trigger (std::shared_ptr<Peer> const&);
std::weak_ptr<PeerSet> pmDowncast () override;
};
} // ripple

View File

@@ -1520,10 +1520,7 @@ void NetworkOPsImp::endConsensus (bool correctLCL)
{
uint256 deadLedger = m_ledgerMaster.getClosedLedger ()->info().parentHash;
// Why do we make a copy of the peer list here?
std::vector <Peer::ptr> peerList = app_.overlay ().getActivePeers ();
for (auto const& it : peerList)
for (auto const& it : app_.overlay ().getActivePeers ())
{
if (it && (it->getClosedLedgerHash () == deadLedger))
{

View File

@@ -73,7 +73,7 @@ public:
int ipLimit = 0;
};
using PeerSequence = std::vector <Peer::ptr>;
using PeerSequence = std::vector <std::shared_ptr<Peer>>;
virtual ~Overlay() = default;
@@ -139,7 +139,7 @@ public:
/** Returns the peer with the matching short id, or null. */
virtual
Peer::ptr
std::shared_ptr<Peer>
findPeerByShortID (Peer::id_t const& id) = 0;
/** Broadcast a proposal. */
@@ -176,7 +176,7 @@ public:
/** Visit every active peer and return a value
The functor must:
- Be callable as:
void operator()(Peer::ptr const& peer);
void operator()(std::shared_ptr<Peer> const& peer);
- Must have the following type alias:
using return_type = void;
- Be callable as:
@@ -193,16 +193,15 @@ public:
typename UnaryFunc::return_type>
foreach (UnaryFunc f)
{
PeerSequence peers (getActivePeers());
for(PeerSequence::const_iterator i = peers.begin(); i != peers.end(); ++i)
f (*i);
for (auto const& p : getActivePeers())
f (p);
return f();
}
/** Visit every active peer
The visitor functor must:
- Be callable as:
void operator()(Peer::ptr const& peer);
void operator()(std::shared_ptr<Peer> const& peer);
- Must have the following type alias:
using return_type = void;
@@ -215,10 +214,8 @@ public:
>
foreach(Function f)
{
PeerSequence peers (getActivePeers());
for(PeerSequence::const_iterator i = peers.begin(); i != peers.end(); ++i)
f (*i);
for (auto const& p : getActivePeers())
f (p);
}
/** Select from active peers

View File

@@ -28,6 +28,7 @@
#include <ripple/beast/utility/Journal.h>
#include <boost/asio/basic_waitable_timer.hpp>
#include <mutex>
#include <set>
namespace ripple {
@@ -97,7 +98,7 @@ public:
This will call the derived class hook function.
@return `true` If the peer was added
*/
bool insert (Peer::ptr const&);
bool insert (std::shared_ptr<Peer> const&);
virtual bool isDone () const
{
@@ -110,24 +111,20 @@ public:
return app_;
}
private:
static void timerEntry (
std::weak_ptr<PeerSet>, const boost::system::error_code& result,
beast::Journal j);
static void timerJobEntry (std::shared_ptr<PeerSet>);
protected:
using ScopedLockType = std::unique_lock <std::recursive_mutex>;
PeerSet (Application& app, uint256 const& hash, std::chrono::milliseconds interval,
bool txnData, clock_type& clock, beast::Journal journal);
clock_type& clock, beast::Journal journal);
virtual ~PeerSet() = 0;
virtual void newPeer (Peer::ptr const&) = 0;
virtual void newPeer (std::shared_ptr<Peer> const&) = 0;
virtual void onTimer (bool progress, ScopedLockType&) = 0;
virtual void execute () = 0;
virtual std::weak_ptr<PeerSet> pmDowncast () = 0;
bool isProgress ()
@@ -148,7 +145,7 @@ protected:
void sendRequest (const protocol::TMGetLedger& message);
void sendRequest (const protocol::TMGetLedger& message, Peer::ptr const& peer);
void sendRequest (const protocol::TMGetLedger& message, std::shared_ptr<Peer> const& peer);
void setTimer ();
@@ -166,19 +163,14 @@ protected:
int mTimeouts;
bool mComplete;
bool mFailed;
bool mTxnData;
clock_type::time_point mLastAction;
bool mProgress;
// VFALCO TODO move the responsibility for the timer to a higher level
boost::asio::basic_waitable_timer<std::chrono::steady_clock> mTimer;
// VFALCO TODO Verify that these are used in the way that the names suggest.
using PeerIdentifier = Peer::id_t;
using ReceivedChunkCount = int;
using PeerSetMap = hash_map <PeerIdentifier, ReceivedChunkCount>;
PeerSetMap mPeers;
// The identifiers of the peers we are tracking.
std::set <Peer::id_t> mPeers;
};
} // ripple

View File

@@ -57,7 +57,7 @@ struct get_peer_json
get_peer_json ()
{ }
void operator() (Peer::ptr const& peer)
void operator() (std::shared_ptr<Peer> const& peer)
{
json.append (peer->json ());
}
@@ -930,14 +930,14 @@ OverlayImpl::check ()
});
}
Peer::ptr
std::shared_ptr<Peer>
OverlayImpl::findPeerByShortID (Peer::id_t const& id)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
auto const iter = ids_.find (id);
if (iter != ids_.end ())
return iter->second.lock();
return Peer::ptr();
return {};
}
void

View File

@@ -178,7 +178,7 @@ public:
void
checkSanity (std::uint32_t) override;
Peer::ptr
std::shared_ptr<Peer>
findPeerByShortID (Peer::id_t const& id) override;
void

View File

@@ -1147,7 +1147,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMLedgerData> const& m)
if (m->has_requestcookie ())
{
Peer::ptr target = overlay_.findPeerByShortID (m->requestcookie ());
std::shared_ptr<Peer> target = overlay_.findPeerByShortID (m->requestcookie ());
if (target)
{
m->clear_requestcookie ();

View File

@@ -38,8 +38,8 @@ class InboundLedger;
// function pure virtual?
//
PeerSet::PeerSet (Application& app, uint256 const& hash,
std::chrono::milliseconds interval, bool txnData,
clock_type& clock, beast::Journal journal)
std::chrono::milliseconds interval, clock_type& clock,
beast::Journal journal)
: app_ (app)
, m_journal (journal)
, m_clock (clock)
@@ -48,7 +48,6 @@ PeerSet::PeerSet (Application& app, uint256 const& hash,
, mTimeouts (0)
, mComplete (false)
, mFailed (false)
, mTxnData (txnData)
, mProgress (false)
, mTimer (app_.getIOService ())
{
@@ -60,11 +59,11 @@ PeerSet::~PeerSet ()
{
}
bool PeerSet::insert (Peer::ptr const& ptr)
bool PeerSet::insert (std::shared_ptr<Peer> const& ptr)
{
ScopedLockType sl (mLock);
if (!mPeers.insert (std::make_pair (ptr->id (), 0)).second)
if (!mPeers.insert (ptr->id ()).second)
return false;
newPeer (ptr);
@@ -74,8 +73,15 @@ bool PeerSet::insert (Peer::ptr const& ptr)
void PeerSet::setTimer ()
{
mTimer.expires_from_now(mTimerInterval);
mTimer.async_wait (std::bind (&PeerSet::timerEntry, pmDowncast (),
beast::asio::placeholders::error, m_journal));
mTimer.async_wait (
[wptr=pmDowncast()](boost::system::error_code const& ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
if (auto ptr = wptr.lock ())
ptr->execute ();
});
}
void PeerSet::invokeOnTimer ()
@@ -102,58 +108,13 @@ void PeerSet::invokeOnTimer ()
setTimer ();
}
void PeerSet::timerEntry (
std::weak_ptr<PeerSet> wptr, const boost::system::error_code& result,
beast::Journal j)
{
if (result == boost::asio::error::operation_aborted)
return;
std::shared_ptr<PeerSet> ptr = wptr.lock ();
if (ptr)
{
// VFALCO NOTE So this function is really two different functions depending on
// the value of mTxnData, which is directly tied to whether we are
// a base class of IncomingLedger or TransactionAcquire
//
if (ptr->mTxnData)
{
ptr->app_.getJobQueue ().addJob (
jtTXN_DATA, "timerEntryTxn", [ptr] (Job&) {
timerJobEntry(ptr);
});
}
else
{
int jc = ptr->app_.getJobQueue ().getJobCountTotal (jtLEDGER_DATA);
if (jc > 4)
{
JLOG (j.debug()) << "Deferring PeerSet timer due to load";
ptr->setTimer ();
}
else
ptr->app_.getJobQueue ().addJob (
jtLEDGER_DATA, "timerEntryLgr", [ptr] (Job&) {
timerJobEntry(ptr);
});
}
}
}
void PeerSet::timerJobEntry (std::shared_ptr<PeerSet> ptr)
{
ptr->invokeOnTimer ();
}
bool PeerSet::isActive ()
{
ScopedLockType sl (mLock);
return !isDone ();
}
void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL, Peer::ptr const& peer)
void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL, std::shared_ptr<Peer> const& peer)
{
if (!peer)
sendRequest (tmGL);
@@ -171,11 +132,9 @@ void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL)
Message::pointer packet (
std::make_shared<Message> (tmGL, protocol::mtGET_LEDGER));
for (auto const& p : mPeers)
for (auto id : mPeers)
{
Peer::ptr peer (app_.overlay ().findPeerByShortID (p.first));
if (peer)
if (auto peer = app_.overlay ().findPeerByShortID (id))
peer->send (packet);
}
}
@@ -184,9 +143,9 @@ std::size_t PeerSet::getPeerCount () const
{
std::size_t ret (0);
for (auto const& p : mPeers)
for (auto id : mPeers)
{
if (app_.overlay ().findPeerByShortID (p.first))
if (app_.overlay ().findPeerByShortID (id))
++ret;
}

View File

@@ -38,7 +38,7 @@ struct send_always
: msg(m)
{ }
void operator()(Peer::ptr const& peer) const
void operator()(std::shared_ptr<Peer> const& peer) const
{
peer->send (msg);
}
@@ -59,7 +59,7 @@ struct send_if_pred
: msg(m), predicate(p)
{ }
void operator()(Peer::ptr const& peer) const
void operator()(std::shared_ptr<Peer> const& peer) const
{
if (predicate (peer))
peer->send (msg);
@@ -90,7 +90,7 @@ struct send_if_not_pred
: msg(m), predicate(p)
{ }
void operator()(Peer::ptr const& peer) const
void operator()(std::shared_ptr<Peer> const& peer) const
{
if (!predicate (peer))
peer->send (msg);
@@ -117,7 +117,7 @@ struct match_peer
: matchPeer (match)
{ }
bool operator() (Peer::ptr const& peer) const
bool operator() (std::shared_ptr<Peer> const& peer) const
{
if(matchPeer && (peer.get () == matchPeer))
return true;
@@ -137,7 +137,7 @@ struct peer_in_cluster
: skipPeer (skip)
{ }
bool operator() (Peer::ptr const& peer) const
bool operator() (std::shared_ptr<Peer> const& peer) const
{
if (skipPeer (peer))
return false;
@@ -160,7 +160,7 @@ struct peer_in_set
: peerSet (peers)
{ }
bool operator() (Peer::ptr const& peer) const
bool operator() (std::shared_ptr<Peer> const& peer) const
{
if (peerSet.count (peer->id ()) == 0)
return false;