From 0b52905cb72bfc38c61773144db3918fb6688df0 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Tue, 12 Mar 2013 14:12:48 -0700 Subject: [PATCH 1/2] Fix another thread race in the websocketpp data code. --- src/cpp/websocketpp/src/messages/data.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cpp/websocketpp/src/messages/data.hpp b/src/cpp/websocketpp/src/messages/data.hpp index ff1b02bdcd..3d2d83222a 100644 --- a/src/cpp/websocketpp/src/messages/data.hpp +++ b/src/cpp/websocketpp/src/messages/data.hpp @@ -212,12 +212,12 @@ private: typedef websocketpp::processor::hybi_util::masking_key_type masking_key_type; friend void intrusive_ptr_add_ref(const data * s) { - boost::unique_lock lock(s->m_lock); + boost::unique_lock lock(s->m_lock); ++s->m_ref_count; } friend void intrusive_ptr_release(const data * s) { - boost::unique_lock lock(s->m_lock); + boost::unique_lock lock(s->m_lock); // TODO: thread safety long count = --s->m_ref_count; @@ -259,7 +259,7 @@ private: mutable boost::detail::atomic_count m_ref_count; mutable pool_weak_ptr m_pool; mutable bool m_live; - mutable boost::mutex m_lock; + mutable boost::recursive_mutex m_lock; }; typedef boost::intrusive_ptr data_ptr; From e9a02882f1431356daf7bf4ed37abbc1fc29cf24 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Tue, 12 Mar 2013 15:17:26 -0700 Subject: [PATCH 2/2] Don't let ledger fetching go too fast. --- src/cpp/ripple/JobQueue.cpp | 8 ++++++++ src/cpp/ripple/JobQueue.h | 5 +++-- src/cpp/ripple/Ledger.cpp | 2 +- src/cpp/ripple/LedgerMaster.cpp | 2 +- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp index d5cd438fab..904af7e8e4 100644 --- a/src/cpp/ripple/JobQueue.cpp +++ b/src/cpp/ripple/JobQueue.cpp @@ -118,6 +118,14 @@ int JobQueue::getJobCount(JobType t) return (c == mJobCounts.end()) ? 0 : c->second.first; } +int JobQueue::getJobCountTotal(JobType t) +{ + boost::mutex::scoped_lock sl(mJobLock); + + std::map< JobType, std::pair >::iterator c = mJobCounts.find(t); + return (c == mJobCounts.end()) ? 0 : (c->second.first + c->second.second); +} + int JobQueue::getJobCountGE(JobType t) { // return the number of jobs at this priority level or greater int ret = 0; diff --git a/src/cpp/ripple/JobQueue.h b/src/cpp/ripple/JobQueue.h index 159b7dfa6a..bf439cc6e1 100644 --- a/src/cpp/ripple/JobQueue.h +++ b/src/cpp/ripple/JobQueue.h @@ -103,8 +103,9 @@ public: void addJob(JobType type, const std::string& name, const boost::function& job); - int getJobCount(JobType t); // Jobs at this priority - int getJobCountGE(JobType t); // All jobs at or greater than this priority + int getJobCount(JobType t); // Jobs waiting at this priority + int getJobCountTotal(JobType t); // Jobs waiting plus running at this priority + int getJobCountGE(JobType t); // All waiting jobs at or greater than this priority std::vector< std::pair > > getJobCounts(); // jobs waiting, threads doing void shutdown(); diff --git a/src/cpp/ripple/Ledger.cpp b/src/cpp/ripple/Ledger.cpp index 2fbf26eb18..52ce07c506 100644 --- a/src/cpp/ripple/Ledger.cpp +++ b/src/cpp/ripple/Ledger.cpp @@ -512,7 +512,7 @@ void Ledger::saveAcceptedLedger(Job&, bool fromConsensus) if (!fromConsensus) dropCache(); - if (theApp->getJobQueue().getJobCount(jtPUBOLDLEDGER) == 0) + if (theApp->getJobQueue().getJobCountTotal(jtPUBOLDLEDGER) == 0) theApp->getLedgerMaster().resumeAcquiring(); } diff --git a/src/cpp/ripple/LedgerMaster.cpp b/src/cpp/ripple/LedgerMaster.cpp index 4d79e51668..892d70822c 100644 --- a/src/cpp/ripple/LedgerMaster.cpp +++ b/src/cpp/ripple/LedgerMaster.cpp @@ -388,7 +388,7 @@ void LedgerMaster::setFullLedger(Ledger::pointer ledger) return; } - if (theApp->getJobQueue().getJobCount(jtPUBOLDLEDGER) > 2) + if (theApp->getJobQueue().getJobCountTotal(jtPUBOLDLEDGER) > 1) { cLog(lsDEBUG) << "Too many pending ledger saves"; return;