diff --git a/Builds/VisualStudio2015/RippleD.vcxproj b/Builds/VisualStudio2015/RippleD.vcxproj
index d0b6f45029..a465874930 100644
--- a/Builds/VisualStudio2015/RippleD.vcxproj
+++ b/Builds/VisualStudio2015/RippleD.vcxproj
@@ -1935,6 +1935,8 @@
+
+
@@ -4417,6 +4419,10 @@
True
True
+
+ True
+ True
+
True
True
diff --git a/Builds/VisualStudio2015/RippleD.vcxproj.filters b/Builds/VisualStudio2015/RippleD.vcxproj.filters
index c2ab06c3d2..42399cfd1c 100644
--- a/Builds/VisualStudio2015/RippleD.vcxproj.filters
+++ b/Builds/VisualStudio2015/RippleD.vcxproj.filters
@@ -2532,6 +2532,9 @@
ripple\core
+
+ ripple\core
+
ripple\core
@@ -5160,6 +5163,9 @@
test\core
+
+ test\core
+
test\core
diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp
index b9f62dd67d..5b60b1f787 100644
--- a/src/ripple/app/misc/NetworkOPs.cpp
+++ b/src/ripple/app/misc/NetworkOPs.cpp
@@ -19,13 +19,9 @@
#include
#include
-#include
-#include
#include
-#include
#include
#include
-#include
#include
#include
#include
@@ -39,36 +35,21 @@
#include
#include
#include
-#include
#include
#include
#include
-#include
-#include
#include
-#include
-#include
-#include
#include
-#include
-#include
#include
#include
-#include
+#include
#include
#include
#include
-#include
#include
#include
#include
#include
-#include
-#include
-#include
-#include
-#include
-#include
#include
#include
#include
@@ -76,11 +57,6 @@
#include
#include
#include
-#include
-#include
-#include
-#include
-#include
namespace ripple {
@@ -237,7 +213,10 @@ public:
{
}
- ~NetworkOPsImp() override = default;
+ ~NetworkOPsImp() override
+ {
+ jobCounter_.join();
+ }
public:
OperatingMode getOperatingMode () const override
@@ -508,6 +487,9 @@ public:
m_heartbeatTimer.cancel();
m_clusterTimer.cancel();
+ // Wait until all our in-flight Jobs are completed.
+ jobCounter_.join();
+
stopped ();
}
@@ -559,6 +541,7 @@ private:
DeadlineTimer m_heartbeatTimer;
DeadlineTimer m_clusterTimer;
+ JobCounter jobCounter_;
std::shared_ptr mConsensus;
@@ -668,13 +651,15 @@ void NetworkOPsImp::onDeadlineTimer (DeadlineTimer& timer)
{
if (timer == m_heartbeatTimer)
{
- m_job_queue.addJob (jtNETOP_TIMER, "NetOPs.heartbeat",
- [this] (Job&) { processHeartbeatTimer(); });
+ m_job_queue.addCountedJob (
+ jtNETOP_TIMER, "NetOPs.heartbeat", jobCounter_,
+ [this] (Job&) { processHeartbeatTimer(); });
}
else if (timer == m_clusterTimer)
{
- m_job_queue.addJob (jtNETOP_CLUSTER, "NetOPs.cluster",
- [this] (Job&) { processClusterTimer(); });
+ m_job_queue.addCountedJob (
+ jtNETOP_CLUSTER, "NetOPs.cluster", jobCounter_,
+ [this] (Job&) { processClusterTimer(); });
}
}
@@ -740,6 +725,7 @@ void NetworkOPsImp::processClusterTimer ()
if (!update)
{
JLOG(m_journal.debug()) << "Too soon to send cluster update";
+ setClusterTimer ();
return;
}
@@ -841,10 +827,12 @@ void NetworkOPsImp::submitTransaction (std::shared_ptr const& iTrans
auto tx = std::make_shared (
trans, reason, app_);
- m_job_queue.addJob (jtTRANSACTION, "submitTxn", [this, tx] (Job&) {
- auto t = tx;
- processTransaction(t, false, false, FailHard::no);
- });
+ m_job_queue.addCountedJob (
+ jtTRANSACTION, "submitTxn", jobCounter_,
+ [this, tx] (Job&) {
+ auto t = tx;
+ processTransaction(t, false, false, FailHard::no);
+ });
}
void NetworkOPsImp::processTransaction (std::shared_ptr& transaction,
@@ -906,9 +894,12 @@ void NetworkOPsImp::doTransactionAsync (std::shared_ptr transaction
if (mDispatchState == DispatchState::none)
{
- m_job_queue.addJob (jtBATCH, "transactionBatch",
- [this] (Job&) { transactionBatch(); });
- mDispatchState = DispatchState::scheduled;
+ if (m_job_queue.addCountedJob (
+ jtBATCH, "transactionBatch", jobCounter_,
+ [this] (Job&) { transactionBatch(); }))
+ {
+ mDispatchState = DispatchState::scheduled;
+ }
}
}
@@ -938,9 +929,12 @@ void NetworkOPsImp::doTransactionSync (std::shared_ptr transaction,
if (mTransactions.size())
{
// More transactions need to be applied, but by another job.
- m_job_queue.addJob (jtBATCH, "transactionBatch",
- [this] (Job&) { transactionBatch(); });
- mDispatchState = DispatchState::scheduled;
+ if (m_job_queue.addCountedJob (
+ jtBATCH, "transactionBatch", jobCounter_,
+ [this] (Job&) { transactionBatch(); }))
+ {
+ mDispatchState = DispatchState::scheduled;
+ }
}
}
}
@@ -2460,12 +2454,12 @@ void NetworkOPsImp::reportFeeChange ()
app_.getTxQ().getMetrics(*app_.openLedger().current()),
app_.getFeeTrack()};
-
// only schedule the job if something has changed
if (f != mLastFeeSummary)
{
- m_job_queue.addJob ( jtCLIENT, "reportFeeChange->pubServer",
- [this] (Job&) { pubServer(); });
+ m_job_queue.addCountedJob (
+ jtCLIENT, "reportFeeChange->pubServer", jobCounter_,
+ [this] (Job&) { pubServer(); });
}
}
diff --git a/src/ripple/core/JobCounter.h b/src/ripple/core/JobCounter.h
new file mode 100644
index 0000000000..834f3bd677
--- /dev/null
+++ b/src/ripple/core/JobCounter.h
@@ -0,0 +1,191 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2017 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#ifndef RIPPLE_CORE_JOB_COUNTER_H_INCLUDED
+#define RIPPLE_CORE_JOB_COUNTER_H_INCLUDED
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace ripple {
+
+// A class that does reference counting for Jobs. The reference counting
+// allows a Stoppable to assure that all child Jobs in the JobQueue are
+// completed before the Stoppable declares itself stopped().
+class JobCounter
+{
+private:
+ std::mutex mutable mutex_ {};
+ std::condition_variable allJobsDoneCond_ {}; // guard with mutex_
+ bool waitForJobs_ {false}; // guard with mutex_
+ std::atomic jobCount_ {0};
+
+ // Increment the count.
+ JobCounter& operator++()
+ {
+ ++jobCount_;
+ return *this;
+ }
+
+ // Decrement the count. If we're stopping and the count drops to zero
+ // notify allJobsDoneCond_.
+ JobCounter& operator--()
+ {
+ // Even though jobCount_ is atomic, we decrement its value under a
+ // lock. This removes a small timing window that occurs if the
+ // waiting thread is handling a spurious wakeup when jobCount_
+ // drops to zero.
+ std::lock_guard lock {mutex_};
+
+ // Update jobCount_. Notify if we're stopping and all jobs are done.
+ if ((--jobCount_ == 0) && waitForJobs_)
+ allJobsDoneCond_.notify_all();
+ return *this;
+ }
+
+ // A private template class that helps count the number of Jobs
+ // in flight. This allows Stoppables to hold off declaring stopped()
+ // until all their JobQueue Jobs are dispatched.
+ template
+ class CountedJob
+ {
+ private:
+ JobCounter& counter_;
+ JobHandler handler_;
+
+ static_assert (
+ std::is_same())), void>::value,
+ "JobHandler must be callable with Job&");
+
+ public:
+ CountedJob() = delete;
+
+ CountedJob (CountedJob const& rhs)
+ : counter_ (rhs.counter_)
+ , handler_ (rhs.handler_)
+ {
+ ++counter_;
+ }
+
+ CountedJob (CountedJob&& rhs)
+ : counter_ (rhs.counter_)
+ , handler_ (std::move (rhs.handler_))
+ {
+ ++counter_;
+ }
+
+ CountedJob (JobCounter& counter, JobHandler&& handler)
+ : counter_ (counter)
+ , handler_ (std::move (handler))
+ {
+ ++counter_;
+ }
+
+ CountedJob& operator=(CountedJob const& rhs) = delete;
+ CountedJob& operator=(CountedJob&& rhs) = delete;
+
+ ~CountedJob()
+ {
+ --counter_;
+ }
+
+ void operator ()(Job& job)
+ {
+ return handler_ (job);
+ }
+ };
+
+public:
+ JobCounter() = default;
+ // Not copyable or movable. Outstanding counts would be hard to sort out.
+ JobCounter (JobCounter const&) = delete;
+
+ JobCounter& operator=(JobCounter const&) = delete;
+
+ /** Destructor verifies all in-flight jobs are complete. */
+ ~JobCounter()
+ {
+ join();
+ }
+
+ /** Returns once all counted in-flight Jobs are destroyed. */
+ void join()
+ {
+ std::unique_lock lock {mutex_};
+ waitForJobs_ = true;
+ if (jobCount_ > 0)
+ {
+ allJobsDoneCond_.wait (
+ lock, [this] { return jobCount_ == 0; });
+ }
+ }
+
+ /** Wrap the passed lambda with a reference counter.
+
+ @param handler Lambda that accepts a Job& parameter and returns void.
+ @return If join() has been called returns boost::none. Otherwise
+ returns a boost::optional that wraps handler with a
+ reference counter.
+ */
+ template
+ boost::optional>
+ wrap (JobHandler&& handler)
+ {
+ // The current intention is that wrap() may only be called with an
+ // rvalue lambda. That can be adjusted in the future if needed,
+ // but the following static_assert covers current expectations.
+ static_assert (std::is_rvalue_reference::value,
+ "JobCounter::wrap() only supports rvalue lambdas.");
+
+ boost::optional> ret;
+
+ std::lock_guard lock {mutex_};
+ if (! waitForJobs_)
+ ret.emplace (*this, std::move (handler));
+
+ return ret;
+ }
+
+ /** Current number of Jobs outstanding. Only useful for testing. */
+ int count() const
+ {
+ return jobCount_;
+ }
+
+ /** Returns true if this has been joined.
+
+ Even if true is returned, counted Jobs may still be in flight.
+ However if (joined() && (count() == 0)) there should be no more
+ counted Jobs in flight.
+ */
+ bool joined() const
+ {
+ std::lock_guard lock {mutex_};
+ return waitForJobs_;
+ }
+};
+
+} // ripple
+
+#endif // RIPPLE_CORE_JOB_COUNTER_H_INCLUDED
diff --git a/src/ripple/core/JobQueue.h b/src/ripple/core/JobQueue.h
index 81fbd3d050..139fcd618a 100644
--- a/src/ripple/core/JobQueue.h
+++ b/src/ripple/core/JobQueue.h
@@ -22,6 +22,7 @@
#include
#include
+#include
#include
#include
#include
@@ -110,8 +111,35 @@ public:
Stoppable& parent, beast::Journal journal, Logs& logs);
~JobQueue ();
+ /** Adds a job to the JobQueue.
+
+ @param t The type of job.
+ @param name Name of the job.
+ @param func std::function with signature void (Job&). Called when the job is executed.
+ */
void addJob (JobType type, std::string const& name, JobFunction const& func);
+ /** Adds a counted job to the JobQueue.
+
+ @param t The type of job.
+ @param name Name of the job.
+ @param counter JobCounter for counting the Job.
+ @param jobHandler Lambda with signature void (Job&). Called when the job is executed.
+
+ @return true if JobHandler added, false if JobCounter is already joined.
+ */
+ template
+ bool addCountedJob (JobType type,
+ std::string const& name, JobCounter& counter, JobHandler&& jobHandler)
+ {
+ if (auto optionalCountedJob = counter.wrap (std::move (jobHandler)))
+ {
+ addJob (type, name, std::move (*optionalCountedJob));
+ return true;
+ }
+ return false;
+ }
+
/** Creates a coroutine and adds a job to the queue which will run it.
@param t The type of job.
diff --git a/src/test/core/JobCounter_test.cpp b/src/test/core/JobCounter_test.cpp
new file mode 100644
index 0000000000..435fccecab
--- /dev/null
+++ b/src/test/core/JobCounter_test.cpp
@@ -0,0 +1,122 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2017 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace ripple {
+
+//------------------------------------------------------------------------------
+
+class JobCounter_test : public beast::unit_test::suite
+{
+ void testWrap()
+ {
+ // Verify reference counting.
+ JobCounter jobCounter;
+ BEAST_EXPECT (jobCounter.count() == 0);
+ {
+ auto wrapped1 = jobCounter.wrap ([] (Job&) {});
+ BEAST_EXPECT (jobCounter.count() == 1);
+
+ // wrapped1 should be callable with a Job.
+ {
+ Job j;
+ (*wrapped1)(j);
+ }
+ {
+ // Copy should increase reference count.
+ auto wrapped2 (wrapped1);
+ BEAST_EXPECT (jobCounter.count() == 2);
+ {
+ // Move should increase reference count.
+ auto wrapped3 (std::move(wrapped2));
+ BEAST_EXPECT (jobCounter.count() == 3);
+ {
+ // An additional Job also increases count.
+ auto wrapped4 = jobCounter.wrap ([] (Job&) {});
+ BEAST_EXPECT (jobCounter.count() == 4);
+ }
+ BEAST_EXPECT (jobCounter.count() == 3);
+ }
+ BEAST_EXPECT (jobCounter.count() == 2);
+ }
+ BEAST_EXPECT (jobCounter.count() == 1);
+ }
+ BEAST_EXPECT (jobCounter.count() == 0);
+
+ // Join with 0 count should not stall.
+ jobCounter.join();
+
+ // Wrapping a Job after join() should return boost::none.
+ BEAST_EXPECT (jobCounter.wrap ([] (Job&) {}) == boost::none);
+ }
+
+ void testWaitOnJoin()
+ {
+ // Verify reference counting.
+ JobCounter jobCounter;
+ BEAST_EXPECT (jobCounter.count() == 0);
+
+ auto job = (jobCounter.wrap ([] (Job&) {}));
+ BEAST_EXPECT (jobCounter.count() == 1);
+
+ // Calling join() now should stall, so do it on a different thread.
+ std::atomic threadExited {false};
+ std::thread localThread ([&jobCounter, &threadExited] ()
+ {
+ // Should stall after calling join.
+ jobCounter.join();
+ threadExited.store (true);
+ });
+
+ // Wait for the thread to call jobCounter.join().
+ while (! jobCounter.joined());
+
+ // The thread should still be active after waiting a millisecond.
+ // This is not a guarantee that join() stalled the thread, but it
+ // improves confidence.
+ using namespace std::chrono_literals;
+ std::this_thread::sleep_for (1ms);
+ BEAST_EXPECT (threadExited == false);
+
+ // Destroy the Job and expect the thread to exit (asynchronously).
+ job = boost::none;
+ BEAST_EXPECT (jobCounter.count() == 0);
+
+ // Wait for the thread to exit.
+ while (threadExited == false);
+ localThread.join();
+ }
+
+public:
+ void run()
+ {
+ testWrap();
+ testWaitOnJoin();
+ }
+};
+
+BEAST_DEFINE_TESTSUITE(JobCounter, core, ripple);
+
+}
diff --git a/src/test/unity/core_test_unity.cpp b/src/test/unity/core_test_unity.cpp
index a143d58e5e..b7f77069bc 100644
--- a/src/test/unity/core_test_unity.cpp
+++ b/src/test/unity/core_test_unity.cpp
@@ -21,6 +21,7 @@
#include
#include
#include
+#include
#include
#include
#include