From b74e64bd7697e57007789e32c2e79133b70ec85b Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sat, 12 May 2012 22:14:16 -0700 Subject: [PATCH] Abstract out the peer set and timing code fromt he ledger acquire logic. There's probably a better way to handle the timer lambda, but this works. --- src/LedgerAcquire.cpp | 129 ++++++++++++++++++++++++------------------ src/LedgerAcquire.h | 53 +++++++++++------ 2 files changed, 110 insertions(+), 72 deletions(-) diff --git a/src/LedgerAcquire.cpp b/src/LedgerAcquire.cpp index 87229af6c..d2e4e3e85 100644 --- a/src/LedgerAcquire.cpp +++ b/src/LedgerAcquire.cpp @@ -9,15 +9,77 @@ #define LEDGER_ACQUIRE_TIMEOUT 2 -LedgerAcquire::LedgerAcquire(const uint256& hash) : mHash(hash), - mComplete(false), mFailed(false), mHaveBase(false), mHaveState(false), mHaveTransactions(false), - mTimer(theApp->getIOService()) +PeerSet::PeerSet(const uint256& hash, int interval) : mHash(hash), mTimerInterval(interval), + mComplete(false), mFailed(false), mTimer(theApp->getIOService()) +{ ; } + +void PeerSet::peerHas(Peer::pointer ptr) +{ + boost::recursive_mutex::scoped_lock sl(mLock); + std::list >::iterator it = mPeers.begin(); + while (it != mPeers.end()) + { + Peer::pointer pr = it->lock(); + if (!pr) // we have a dead entry, remove it + it = mPeers.erase(it); + else + { + if (pr->samePeer(ptr)) return; // we already have this peer + ++it; + } + } + mPeers.push_back(ptr); + newPeer(ptr); +} + +void PeerSet::badPeer(Peer::pointer ptr) +{ + boost::recursive_mutex::scoped_lock sl(mLock); + std::list >::iterator it=mPeers.begin(); + while (it != mPeers.end()) + { + Peer::pointer pr = it->lock(); + if (!pr) // we have a dead entry, remove it + it = mPeers.erase(it); + else + { + if (ptr->samePeer(pr)) + { // We found a pointer to the bad peer + mPeers.erase(it); + return; + } + ++it; + } + } +} + +void PeerSet::resetTimer() +{ + mTimer.expires_from_now(boost::posix_time::seconds(mTimerInterval)); + mTimer.async_wait(getTimerLamba()); +} + +void LedgerAcquire::LATimerEntry(boost::weak_ptr wptr, const boost::system::error_code& result) +{ + if (result == boost::asio::error::operation_aborted) return; + boost::shared_ptr ptr = wptr.lock(); + if (!!ptr) ptr->onTimer(); +} + +LedgerAcquire::LedgerAcquire(const uint256& hash) : PeerSet(hash, LEDGER_ACQUIRE_TIMEOUT), + mHaveBase(false), mHaveState(false), mHaveTransactions(false) { #ifdef DEBUG std::cerr << "Acquiring ledger " << mHash.GetHex() << std::endl; #endif } +boost::function LedgerAcquire::getTimerLamba() +{ + return boost::bind(&LedgerAcquire::LATimerEntry, boost::weak_ptr(shared_from_this()), + boost::asio::placeholders::error); +} + void LedgerAcquire::done() { #ifdef DEBUG @@ -25,28 +87,23 @@ void LedgerAcquire::done() #endif std::vector< boost::function > triggers; + setComplete(); mLock.lock(); triggers = mOnComplete; mOnComplete.empty(); mLock.unlock(); for (int i = 0; i::shared_from_this()); } -void LedgerAcquire::resetTimer() +struct null_deleter { - mTimer.expires_from_now(boost::posix_time::seconds(LEDGER_ACQUIRE_TIMEOUT)); - mTimer.async_wait(boost::bind(&LedgerAcquire::timerEntry, - boost::weak_ptr(shared_from_this()), boost::asio::placeholders::error)); -} - -void LedgerAcquire::timerEntry(boost::weak_ptr wptr, const boost::system::error_code& result) -{ - if (result == boost::asio::error::operation_aborted) return; - LedgerAcquire::pointer ptr = wptr.lock(); - if (!!ptr) ptr->trigger(Peer::pointer()); -} + void operator()(void const *) const + { // weak pointers never need to delete + assert(false); + } +}; void LedgerAcquire::addOnComplete(boost::function trigger) { @@ -217,46 +274,6 @@ void LedgerAcquire::sendRequest(boost::shared_ptr tmGL) } } -void LedgerAcquire::peerHas(Peer::pointer ptr) -{ - boost::recursive_mutex::scoped_lock sl(mLock); - std::list >::iterator it = mPeers.begin(); - while (it != mPeers.end()) - { - Peer::pointer pr = it->lock(); - if (!pr) // we have a dead entry, remove it - it = mPeers.erase(it); - else - { - if (pr->samePeer(ptr)) return; // we already have this peer - ++it; - } - } - mPeers.push_back(ptr); - if (mPeers.size() == 1) trigger(ptr); -} - -void LedgerAcquire::badPeer(Peer::pointer ptr) -{ - boost::recursive_mutex::scoped_lock sl(mLock); - std::list >::iterator it=mPeers.begin(); - while (it != mPeers.end()) - { - Peer::pointer pr = it->lock(); - if (!pr) // we have a dead entry, remove it - it = mPeers.erase(it); - else - { - if (ptr->samePeer(pr)) - { // We found a pointer to the bad peer - mPeers.erase(it); - return; - } - ++it; - } - } -} - bool LedgerAcquire::takeBase(const std::string& data, Peer::pointer peer) { // Return value: true=normal, false=bad data #ifdef DEBUG diff --git a/src/LedgerAcquire.h b/src/LedgerAcquire.h index 9ae2c2ffd..8d75ea7d3 100644 --- a/src/LedgerAcquire.h +++ b/src/LedgerAcquire.h @@ -13,36 +13,60 @@ #include "Peer.h" #include "../obj/src/newcoin.pb.h" -class LedgerAcquire : public boost::enable_shared_from_this +class PeerSet +{ +protected: + uint256 mHash; + int mTimerInterval; + bool mComplete, mFailed; + + boost::recursive_mutex mLock; + boost::asio::deadline_timer mTimer; + std::list< boost::weak_ptr > mPeers; + + PeerSet(const uint256& hash, int interval); + +public: + const uint256& getHash() const { return mHash; } + bool isComplete() const { return mComplete; } + bool isFailed() const { return mFailed; } + + void peerHas(Peer::pointer); + void badPeer(Peer::pointer); + void resetTimer(); + +protected: + virtual void newPeer(Peer::pointer) = 0; + virtual boost::function getTimerLamba() = 0; + + void setComplete() { mComplete = true; } + void setFailed() { mFailed = true; } +}; + +class LedgerAcquire : public PeerSet, public boost::enable_shared_from_this { // A ledger we are trying to acquire public: typedef boost::shared_ptr pointer; protected: - boost::recursive_mutex mLock; Ledger::pointer mLedger; - uint256 mHash; - bool mComplete, mFailed, mHaveBase, mHaveState, mHaveTransactions; - - boost::asio::deadline_timer mTimer; + bool mHaveBase, mHaveState, mHaveTransactions; std::vector< boost::function > mOnComplete; - std::list > mPeers; // peers known to have this ledger - void done(); + void onTimer() { trigger(Peer::pointer()); } - static void timerEntry(boost::weak_ptr, const boost::system::error_code&); void sendRequest(boost::shared_ptr message); void sendRequest(boost::shared_ptr message, Peer::pointer peer); - void trigger(Peer::pointer peer); + void newPeer(Peer::pointer peer) { trigger(peer); } + void trigger(Peer::pointer); + virtual boost::function getTimerLamba(); + static void LATimerEntry(boost::weak_ptr, const boost::system::error_code&); public: LedgerAcquire(const uint256& hash); - const uint256& getHash() const { return mHash; } - bool isComplete() const { return mComplete; } - bool isFailed() const { return mFailed; } bool isBase() const { return mHaveBase; } bool isAcctStComplete() const { return mHaveState; } bool isTransComplete() const { return mHaveTransactions; } @@ -50,14 +74,11 @@ public: void addOnComplete(boost::function); - void peerHas(Peer::pointer); - void badPeer(Peer::pointer); bool takeBase(const std::string& data, Peer::pointer); bool takeTxNode(const std::list& IDs, const std::list >& data, Peer::pointer); bool takeAsNode(const std::list& IDs, const std::list >& data, Peer::pointer); - void resetTimer(); }; class LedgerAcquireMaster