diff --git a/Papers/NodeStoreRefactoringCaseStudy.pdf b/Papers/NodeStoreRefactoringCaseStudy.pdf
new file mode 100644
index 000000000..6cde8a2ee
Binary files /dev/null and b/Papers/NodeStoreRefactoringCaseStudy.pdf differ
diff --git a/Subtrees/beast/Builds/VisualStudio2012/beast.vcxproj b/Subtrees/beast/Builds/VisualStudio2012/beast.vcxproj
index a7f7a585f..227c1f85f 100644
--- a/Subtrees/beast/Builds/VisualStudio2012/beast.vcxproj
+++ b/Subtrees/beast/Builds/VisualStudio2012/beast.vcxproj
@@ -92,6 +92,7 @@
+
@@ -352,6 +353,12 @@
true
true
+
+ true
+ true
+ true
+ true
+
true
diff --git a/Subtrees/beast/Builds/VisualStudio2012/beast.vcxproj.filters b/Subtrees/beast/Builds/VisualStudio2012/beast.vcxproj.filters
index 5954681eb..709802caa 100644
--- a/Subtrees/beast/Builds/VisualStudio2012/beast.vcxproj.filters
+++ b/Subtrees/beast/Builds/VisualStudio2012/beast.vcxproj.filters
@@ -728,6 +728,9 @@
beast_core\text
+
+ beast_basics\threads
+
@@ -1135,6 +1138,9 @@
beast_core\text
+
+ beast_basics\threads
+
diff --git a/Subtrees/beast/modules/beast_basics/beast_basics.cpp b/Subtrees/beast/modules/beast_basics/beast_basics.cpp
index 37eeb256c..099cbe602 100644
--- a/Subtrees/beast/modules/beast_basics/beast_basics.cpp
+++ b/Subtrees/beast/modules/beast_basics/beast_basics.cpp
@@ -56,6 +56,7 @@ namespace beast
#include "threads/beast_ReadWriteMutex.cpp"
#include "threads/beast_ThreadGroup.cpp"
#include "threads/beast_ThreadWithCallQueue.cpp"
+#include "threads/beast_Workers.cpp"
}
diff --git a/Subtrees/beast/modules/beast_basics/beast_basics.h b/Subtrees/beast/modules/beast_basics/beast_basics.h
index 782bfa387..d9a5b78ba 100644
--- a/Subtrees/beast/modules/beast_basics/beast_basics.h
+++ b/Subtrees/beast/modules/beast_basics/beast_basics.h
@@ -268,6 +268,7 @@ namespace beast
#include "threads/beast_ManualCallQueue.h"
#include "threads/beast_ParallelFor.h"
#include "threads/beast_ThreadWithCallQueue.h"
+#include "threads/beast_Workers.h"
}
diff --git a/Subtrees/beast/modules/beast_basics/threads/beast_ParallelFor.h b/Subtrees/beast/modules/beast_basics/threads/beast_ParallelFor.h
index e2e56fdb7..3903ce445 100644
--- a/Subtrees/beast/modules/beast_basics/threads/beast_ParallelFor.h
+++ b/Subtrees/beast/modules/beast_basics/threads/beast_ParallelFor.h
@@ -71,11 +71,6 @@ public:
*/
int getNumberOfThreads () const;
- template
- void operator () (int numberOfIterations, T1 t1)
- {
- }
-
/** Execute parallel for loop.
Functor is called once for each value in the range
diff --git a/Subtrees/beast/modules/beast_basics/threads/beast_Semaphore.cpp b/Subtrees/beast/modules/beast_basics/threads/beast_Semaphore.cpp
index 9ca4d986e..bf97ee516 100644
--- a/Subtrees/beast/modules/beast_basics/threads/beast_Semaphore.cpp
+++ b/Subtrees/beast/modules/beast_basics/threads/beast_Semaphore.cpp
@@ -17,23 +17,6 @@
*/
//==============================================================================
-Semaphore::WaitingThread::WaitingThread ()
- : m_event (false) // auto-reset
-{
-}
-
-void Semaphore::WaitingThread::wait ()
-{
- m_event.wait ();
-}
-
-void Semaphore::WaitingThread::signal ()
-{
- m_event.signal ();
-}
-
-//==============================================================================
-
Semaphore::Semaphore (int initialCount)
: m_counter (initialCount)
{
@@ -75,8 +58,10 @@ void Semaphore::signal (int amount)
}
}
-void Semaphore::wait ()
+bool Semaphore::wait (int timeOutMilliseconds)
{
+ bool signaled = true;
+
// Always prepare the WaitingThread object first, either
// from the delete list or through a new allocation.
//
@@ -107,11 +92,34 @@ void Semaphore::wait ()
if (waitingThread != nullptr)
{
// Yes so do it.
- waitingThread->wait ();
+ signaled = waitingThread->wait (timeOutMilliseconds);
// If the wait is satisfied, then we've been taken off the
// waiting list so put waitingThread back in the delete list.
//
m_deleteList.push_front (waitingThread);
}
+
+ return signaled;
}
+
+//------------------------------------------------------------------------------
+
+Semaphore::WaitingThread::WaitingThread ()
+ : m_event (false) // auto-reset
+{
+}
+
+bool Semaphore::WaitingThread::wait (int timeOutMilliseconds)
+{
+ return m_event.wait (timeOutMilliseconds);
+}
+
+void Semaphore::WaitingThread::signal ()
+{
+ m_event.signal ();
+}
+
+//------------------------------------------------------------------------------
+
+// VFALCO TODO Unit Tests!
diff --git a/Subtrees/beast/modules/beast_basics/threads/beast_Semaphore.h b/Subtrees/beast/modules/beast_basics/threads/beast_Semaphore.h
index de78a0a1e..9990757ee 100644
--- a/Subtrees/beast/modules/beast_basics/threads/beast_Semaphore.h
+++ b/Subtrees/beast/modules/beast_basics/threads/beast_Semaphore.h
@@ -49,8 +49,12 @@ public:
void signal (int amount = 1);
/** Wait for a resource.
+
+ A negative time-out value means that the method will wait indefinitely.
+
+ @returns true if the event has been signalled, false if the timeout expires.
*/
- void wait ();
+ bool wait (int timeOutMilliseconds = -1);
private:
class WaitingThread
@@ -60,7 +64,7 @@ private:
public:
WaitingThread ();
- void wait ();
+ bool wait (int timeOutMilliseconds);
void signal ();
private:
diff --git a/Subtrees/beast/modules/beast_basics/threads/beast_Workers.cpp b/Subtrees/beast/modules/beast_basics/threads/beast_Workers.cpp
new file mode 100644
index 000000000..b4fa54ec9
--- /dev/null
+++ b/Subtrees/beast/modules/beast_basics/threads/beast_Workers.cpp
@@ -0,0 +1,276 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of Beast: https://github.com/vinniefalco/Beast
+ Copyright 2013, Vinnie Falco
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+Workers::Workers (Callback& callback, int numberOfThreads)
+ : m_callback (callback)
+ , m_allPaused (true, true)
+ , m_semaphore (0)
+ , m_numberOfThreads (0)
+{
+ setNumberOfThreads (numberOfThreads);
+}
+
+Workers::~Workers ()
+{
+ pauseAllThreadsAndWait ();
+
+ deleteWorkers (m_everyone);
+}
+
+int Workers::getNumberOfThreads () const noexcept
+{
+ return m_numberOfThreads;
+}
+
+// VFALCO NOTE if this function is called quickly to reduce then
+// increase the number of threads, it could result in
+// more paused threads being created than expected.
+//
+void Workers::setNumberOfThreads (int numberOfThreads)
+{
+ if (m_numberOfThreads != numberOfThreads)
+ {
+ if (numberOfThreads > m_numberOfThreads)
+ {
+ // Increasing the number of working threads
+
+ int const amount = numberOfThreads - m_numberOfThreads;
+
+ for (int i = 0; i < amount; ++i)
+ {
+ // See if we can reuse a paused worker
+ Worker* worker = m_paused.pop_front ();
+
+ if (worker != nullptr)
+ {
+ // If we got here then the worker thread is at [1]
+ // This will unblock their call to wait()
+ //
+ worker->notify ();
+ }
+ else
+ {
+ worker = new Worker (*this);
+ }
+
+ m_everyone.push_front (worker);
+ }
+ }
+ else if (numberOfThreads < m_numberOfThreads)
+ {
+ // Decreasing the number of working threads
+
+ int const amount = m_numberOfThreads - numberOfThreads;
+
+ for (int i = 0; i < amount; ++i)
+ {
+ ++m_pauseCount;
+
+ // Pausing a thread counts as one "internal task"
+ m_semaphore.signal ();
+ }
+ }
+
+ m_numberOfThreads = numberOfThreads;
+ }
+}
+
+void Workers::pauseAllThreadsAndWait ()
+{
+ setNumberOfThreads (0);
+
+ m_allPaused.wait ();
+}
+
+void Workers::addTask ()
+{
+ m_semaphore.signal ();
+}
+
+void Workers::deleteWorkers (LockFreeStack & stack)
+{
+ for (;;)
+ {
+ Worker* const worker = stack.pop_front ();
+
+ if (worker != nullptr)
+ {
+ // This call blocks until the thread orderly exits
+ delete worker;
+ }
+ else
+ {
+ break;
+ }
+ }
+}
+
+//------------------------------------------------------------------------------
+
+Workers::Worker::Worker (Workers& workers)
+ : Thread ("Worker")
+ , m_workers (workers)
+{
+ startThread ();
+}
+
+Workers::Worker::~Worker ()
+{
+ stopThread ();
+}
+
+void Workers::Worker::run ()
+{
+ while (! threadShouldExit ())
+ {
+ // Increment the count of active workers, and if
+ // we are the first one then reset the "all paused" event
+ //
+ if (++m_workers.m_activeCount == 1)
+ m_workers.m_allPaused.reset ();
+
+ for (;;)
+ {
+ // Acquire a task or "internal task."
+ //
+ m_workers.m_semaphore.wait ();
+
+ // See if there's a pause request. This
+ // counts as an "internal task."
+ //
+ int pauseCount = m_workers.m_pauseCount.get ();
+
+ if (pauseCount > 0)
+ {
+ // Try to decrement
+ pauseCount = --m_workers.m_pauseCount;
+
+ if (pauseCount >= 0)
+ {
+ // We got paused
+ break;
+ }
+ else
+ {
+ // Undo our decrement
+ ++m_workers.m_pauseCount;
+ }
+ }
+
+ // We couldn't pause so we must have gotten
+ // unblocked in order to process a task.
+ //
+ m_workers.m_callback.processTask ();
+ }
+
+ // Any worker that goes into the paused list must
+ // guarantee that it will eventually block on its
+ // event object.
+ //
+ m_workers.m_paused.push_front (this);
+
+ // Decrement the count of active workers, and if we
+ // are the last one then signal the "all paused" event.
+ //
+ if (--m_workers.m_activeCount == 0)
+ m_workers.m_allPaused.signal ();
+
+ // [1] We will be here when the paused list is popped
+ //
+ // We block on our event object, a requirement of being
+ // put into the paused list.
+ //
+ // This will get signaled on either a reactivate or a stopThread()
+ //
+ wait ();
+ }
+}
+
+//------------------------------------------------------------------------------
+
+class WorkersTests : public UnitTest
+{
+public:
+ WorkersTests () : UnitTest ("Workers", "beast")
+ {
+ }
+
+ struct TestCallback : Workers::Callback
+ {
+ explicit TestCallback (int count_)
+ : finished (false, count_ == 0)
+ , count (count_)
+ {
+ }
+
+ void processTask ()
+ {
+ if (--count == 0)
+ finished.signal ();
+ }
+
+ WaitableEvent finished;
+ Atomic count;
+ };
+
+ void testThreads (int const threadCount)
+ {
+ String s;
+ s << "threadCount = " << String (threadCount);
+ beginTestCase (s);
+
+ TestCallback cb (threadCount);
+
+ Workers w (cb, 0);
+ expect (w.getNumberOfThreads () == 0);
+
+ w.setNumberOfThreads (threadCount);
+ expect (w.getNumberOfThreads () == threadCount);
+
+ for (int i = 0; i < threadCount; ++i)
+ w.addTask ();
+
+ // 10 seconds should be enough to finish on any system
+ //
+ bool signaled = cb.finished.wait (10 * 1000);
+
+ expect (signaled, "timed out");
+
+ w.pauseAllThreadsAndWait ();
+
+ int const count (cb.count.get ());
+
+ expectEquals (count, 0);
+ }
+
+ void runTest ()
+ {
+ testThreads (0);
+ testThreads (1);
+ testThreads (2);
+ testThreads (4);
+ testThreads (16);
+ testThreads (64);
+ testThreads (128);
+ testThreads (256);
+ testThreads (512);
+ }
+};
+
+static WorkersTests workersTests;
diff --git a/Subtrees/beast/modules/beast_basics/threads/beast_Workers.h b/Subtrees/beast/modules/beast_basics/threads/beast_Workers.h
new file mode 100644
index 000000000..4d2d9e9a9
--- /dev/null
+++ b/Subtrees/beast/modules/beast_basics/threads/beast_Workers.h
@@ -0,0 +1,122 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of Beast: https://github.com/vinniefalco/Beast
+ Copyright 2013, Vinnie Falco
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#ifndef BEAST_WORKERS_H_INCLUDED
+#define BEAST_WORKERS_H_INCLUDED
+
+/** A group of threads that process tasks.
+*/
+class Workers
+{
+public:
+ /** Called to perform tasks as needed. */
+ struct Callback
+ {
+ /** Perform a task.
+ The call is made on a thread owned by Workers.
+ */
+ virtual void processTask () = 0;
+ };
+
+ /** Create the object.
+
+ A number of initial threads may be optionally specified. The
+ default is to create one thread per CPU.
+ */
+ explicit Workers (Callback& callback, int numberOfThreads = SystemStats::getNumCpus ());
+
+ ~Workers ();
+
+ /** Retrieve the desired number of threads.
+
+ This just returns the number of active threads that were requested. If
+ there was a recent call to setNumberOfThreads, the actual number of active
+ threads may be temporarily different from what was last requested.
+
+ @note This function is not thread-safe.
+ */
+ int getNumberOfThreads () const noexcept;
+
+ /** Set the desired number of threads.
+ @note This function is not thread-safe.
+ */
+ void setNumberOfThreads (int numberOfThreads);
+
+ /** Pause all threads and wait until they are paused.
+
+ If a thread is processing a task it will pause as soon as the task
+ completes. There may still be tasks signaled even after all threads
+ have paused.
+
+ @note This function is not thread-safe.
+ */
+ void pauseAllThreadsAndWait ();
+
+ /** Increment the number of tasks.
+ The callback will be called for each task.
+ @note This function is thread-safe.
+ */
+ void addTask ();
+
+ //--------------------------------------------------------------------------
+
+private:
+ struct PausedTag { };
+
+ /* A Worker executes tasks on its provided thread.
+
+ These are the states:
+
+ Active: Running the task processing loop.
+ Idle: Active, but blocked on waiting for a task.
+ Pausd: Blocked waiting to exit or become active.
+ */
+
+ class Worker
+ : public LockFreeStack ::Node
+ , public LockFreeStack ::Node
+ , public Thread
+ {
+ public:
+ explicit Worker (Workers& workers);
+
+ ~Worker ();
+
+ private:
+ void run ();
+
+ private:
+ Workers& m_workers;
+ };
+
+private:
+ static void deleteWorkers (LockFreeStack & stack);
+
+private:
+ Callback& m_callback;
+ WaitableEvent m_allPaused; // signaled when all threads paused
+ Semaphore m_semaphore; // each pending task is 1 resource
+ int m_numberOfThreads; // how many we want active now
+ Atomic m_activeCount; // to know when all are paused
+ Atomic m_pauseCount; // how many threads need to pause now
+ LockFreeStack m_everyone; // holds all created workers
+ LockFreeStack m_paused; // holds just paused workers
+};
+
+#endif
diff --git a/Subtrees/beast/modules/beast_core/diagnostic/beast_UnitTest.h b/Subtrees/beast/modules/beast_core/diagnostic/beast_UnitTest.h
index 656ce7ae1..41f7215d3 100644
--- a/Subtrees/beast/modules/beast_core/diagnostic/beast_UnitTest.h
+++ b/Subtrees/beast/modules/beast_core/diagnostic/beast_UnitTest.h
@@ -253,8 +253,8 @@ public:
/** Compares two values, and if they don't match, prints out a message containing the
expected and actual result values.
*/
- template
- void expectEquals (ValueType actual, ValueType expected, String failureMessage = String::empty)
+ template
+ void expectEquals (ActualType actual, ExpectedType expected, String failureMessage = String::empty)
{
const bool result = (actual == expected);
diff --git a/Subtrees/beast/modules/beast_core/native/beast_posix_SharedCode.h b/Subtrees/beast/modules/beast_core/native/beast_posix_SharedCode.h
index 222490176..fd9d28b97 100644
--- a/Subtrees/beast/modules/beast_core/native/beast_posix_SharedCode.h
+++ b/Subtrees/beast/modules/beast_core/native/beast_posix_SharedCode.h
@@ -54,7 +54,7 @@ void CriticalSection::exit() const noexcept
//==============================================================================
-WaitableEvent::WaitableEvent (const bool useManualReset) noexcept
+WaitableEvent::WaitableEvent (const bool useManualReset, bool initiallySignaled) noexcept
: triggered (false), manualReset (useManualReset)
{
pthread_cond_init (&condition, 0);
@@ -65,6 +65,9 @@ WaitableEvent::WaitableEvent (const bool useManualReset) noexcept
pthread_mutexattr_setprotocol (&atts, PTHREAD_PRIO_INHERIT);
#endif
pthread_mutex_init (&mutex, &atts);
+
+ if (initiallySignaled)
+ signal ();
}
WaitableEvent::~WaitableEvent() noexcept
diff --git a/Subtrees/beast/modules/beast_core/native/beast_win32_Threads.cpp b/Subtrees/beast/modules/beast_core/native/beast_win32_Threads.cpp
index 6c9f455a7..52438b5e8 100644
--- a/Subtrees/beast/modules/beast_core/native/beast_win32_Threads.cpp
+++ b/Subtrees/beast/modules/beast_core/native/beast_win32_Threads.cpp
@@ -80,9 +80,11 @@ void CriticalSection::exit() const noexcept
}
//==============================================================================
-WaitableEvent::WaitableEvent (const bool manualReset) noexcept
+WaitableEvent::WaitableEvent (const bool manualReset, bool initiallySignaled) noexcept
: internal (CreateEvent (0, manualReset ? TRUE : FALSE, FALSE, 0))
{
+ if (initiallySignaled)
+ signal ();
}
WaitableEvent::~WaitableEvent() noexcept
diff --git a/Subtrees/beast/modules/beast_core/text/beast_String.cpp b/Subtrees/beast/modules/beast_core/text/beast_String.cpp
index f55bb510d..5d4648c55 100644
--- a/Subtrees/beast/modules/beast_core/text/beast_String.cpp
+++ b/Subtrees/beast/modules/beast_core/text/beast_String.cpp
@@ -5,7 +5,7 @@
Portions of this file are from JUCE.
Copyright (c) 2013 - Raw Material Software Ltd.
- Please visit http://www.juce.com
+ Please visit http://www.beast.com
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@@ -131,8 +131,8 @@ public:
if (start.getAddress() == nullptr || start.isEmpty())
return getEmpty();
- const size_t numBytes = (size_t)( reinterpret_cast (end.getAddress())
- - reinterpret_cast (start.getAddress()));
+ const size_t numBytes = (size_t) (reinterpret_cast (end.getAddress())
+ - reinterpret_cast (start.getAddress()));
const CharPointerType dest (createUninitialisedBytes (numBytes + sizeof (CharType)));
memcpy (dest.getAddress(), start, numBytes);
dest.getAddress()[numBytes / sizeof (CharType)] = 0;
@@ -358,88 +358,54 @@ String String::charToString (const beast_wchar character)
//==============================================================================
namespace NumberToStringConverters
{
+ template
+ static inline char* printDigits (char* t, Type v) noexcept
+ {
+ *--t = 0;
+
+ do
+ {
+ *--t = '0' + (char) (v % 10);
+ v /= 10;
+
+ } while (v > 0);
+
+ return t;
+ }
+
// pass in a pointer to the END of a buffer..
static char* numberToString (char* t, const int64 n) noexcept
{
- if (n > 0)
- {
- *--t = 0;
- uint64 v = static_cast (n);
-
- do
- {
- *--t = (char) ('0' + (int) (v % 10));
- v /= 10;
-
- }
- while (v > 0);
- }
- else
- {
- *--t = 0;
- uint64 v = ((uint64)(-(n + 1)) + 1);
-
- do
- {
- *--t = (char) ('0' + (int) (v % 10));
- v /= 10;
-
- }
- while (v > 0);
-
- *--t = '-';
- }
+ if (n >= 0)
+ return printDigits (t, static_cast (n));
+ // NB: this needs to be careful not to call -std::numeric_limits::min(),
+ // which has undefined behaviour
+ t = printDigits (t, static_cast (-(n + 1)) + 1);
+ *--t = '-';
return t;
}
static char* numberToString (char* t, uint64 v) noexcept
{
- *--t = 0;
-
- do
- {
- *--t = (char) ('0' + (int) (v % 10));
- v /= 10;
-
- } while (v > 0);
-
- return t;
+ return printDigits (t, v);
}
static char* numberToString (char* t, const int n) noexcept
{
- if (n == (int) 0x80000000) // (would cause an overflow)
- return numberToString (t, (int64) n);
-
- *--t = 0;
- int v = abs (n);
-
- do
- {
- *--t = (char) ('0' + (v % 10));
- v /= 10;
-
- } while (v > 0);
-
- if (n < 0)
- *--t = '-';
+ if (n >= 0)
+ return printDigits (t, static_cast (n));
+ // NB: this needs to be careful not to call -std::numeric_limits::min(),
+ // which has undefined behaviour
+ t = printDigits (t, static_cast (-(n + 1)) + 1);
+ *--t = '-';
return t;
}
static char* numberToString (char* t, unsigned int v) noexcept
{
- *--t = 0;
-
- do
- {
- *--t = (char) ('0' + (v % 10));
- v /= 10;
-
- } while (v > 0);
-
- return t;
+ return printDigits (t, v);
}
static char* doubleToString (char* buffer, const int numChars, double n, int numDecPlaces, size_t& len) noexcept
@@ -492,7 +458,6 @@ namespace NumberToStringConverters
char buffer [32];
char* const end = buffer + numElementsInArray (buffer);
char* const start = numberToString (end, number);
-
return StringHolder::createFromFixedLength (start, (size_t) (end - start - 1));
}
@@ -2094,7 +2059,9 @@ String String::fromUTF8 (const char* const buffer, int bufferSizeBytes)
class StringTests : public UnitTest
{
public:
- StringTests() : UnitTest ("String", "beast") { }
+ StringTests() : UnitTest ("String", "beast")
+ {
+ }
template
struct TestUTFConversion
@@ -2236,6 +2203,10 @@ public:
expect (String ((int64) -1234).getLargeIntValue() == -1234);
expect (String (-1234.56).getDoubleValue() == -1234.56);
expect (String (-1234.56f).getFloatValue() == -1234.56f);
+ expect (String (std::numeric_limits::max()).getIntValue() == std::numeric_limits::max());
+ expect (String (std::numeric_limits::min()).getIntValue() == std::numeric_limits::min());
+ expect (String (std::numeric_limits::max()).getLargeIntValue() == std::numeric_limits::max());
+ expect (String (std::numeric_limits::min()).getLargeIntValue() == std::numeric_limits::min());
expect (("xyz" + s).getTrailingIntValue() == s.getIntValue());
expect (s.getHexValue32() == 0x12345678);
expect (s.getHexValue64() == (int64) 0x12345678);
@@ -2243,6 +2214,24 @@ public:
expect (String::toHexString ((int64) 0x1234abcd).equalsIgnoreCase ("1234abcd"));
expect (String::toHexString ((short) 0x12ab).equalsIgnoreCase ("12ab"));
+ expectEquals (String (int (0)), "0");
+ expectEquals (String (short (0)), "0");
+ expectEquals (String (int64 (0)), "0");
+ expectEquals (String ((unsigned int) 0), "0");
+ expectEquals (String ((unsigned short) 0), "0");
+ expectEquals (String (uint64 (0)), "0");
+
+ expectEquals (String (int (-1)), "-1");
+ expectEquals (String (short (-1)), "-1");
+ expectEquals (String (int64 (-1)), "-1");
+
+ expectEquals (String (int (1)), "1");
+ expectEquals (String (short (1)), "1");
+ expectEquals (String (int64 (1)), "1");
+ expectEquals (String ((unsigned int) 1), "1");
+ expectEquals (String ((unsigned short) 1), "1");
+ expectEquals (String (uint64 (1)), "1");
+
unsigned char data[] = { 1, 2, 3, 4, 0xa, 0xb, 0xc, 0xd };
expect (String::toHexString (data, 8, 0).equalsIgnoreCase ("010203040a0b0c0d"));
expect (String::toHexString (data, 8, 1).equalsIgnoreCase ("01 02 03 04 0a 0b 0c 0d"));
@@ -2418,4 +2407,5 @@ public:
}
};
-static StringTests stringTests;
+static StringTests stringUnitTests;
+
diff --git a/Subtrees/beast/modules/beast_core/threads/beast_Thread.h b/Subtrees/beast/modules/beast_core/threads/beast_Thread.h
index e0d28b64a..2a3027264 100644
--- a/Subtrees/beast/modules/beast_core/threads/beast_Thread.h
+++ b/Subtrees/beast/modules/beast_core/threads/beast_Thread.h
@@ -113,7 +113,7 @@ public:
value in here will wait forever.
@see signalThreadShouldExit, threadShouldExit, waitForThreadToExit, isThreadRunning
*/
- void stopThread (int timeOutMilliseconds);
+ void stopThread (int timeOutMilliseconds = -1);
//==============================================================================
/** Returns true if the thread is currently active */
@@ -150,7 +150,7 @@ public:
is less than zero, it will wait forever.
@returns true if the thread exits, or false if the timeout expires first.
*/
- bool waitForThreadToExit (int timeOutMilliseconds) const;
+ bool waitForThreadToExit (int timeOutMilliseconds = -1) const;
//==============================================================================
/** Changes the thread's priority.
@@ -205,7 +205,7 @@ public:
@returns true if the event has been signalled, false if the timeout expires.
*/
- bool wait (int timeOutMilliseconds) const;
+ bool wait (int timeOutMilliseconds = -1) const;
/** Wakes up the thread.
diff --git a/Subtrees/beast/modules/beast_core/threads/beast_WaitableEvent.h b/Subtrees/beast/modules/beast_core/threads/beast_WaitableEvent.h
index 1a01d7e5c..22e610481 100644
--- a/Subtrees/beast/modules/beast_core/threads/beast_WaitableEvent.h
+++ b/Subtrees/beast/modules/beast_core/threads/beast_WaitableEvent.h
@@ -44,8 +44,11 @@ public:
@param manualReset If this is false, the event will be reset automatically when the wait()
method is called. If manualReset is true, then once the event is signalled,
the only way to reset it will be by calling the reset() method.
+
+ @param initiallySignaled If this is true then the event will be signaled when
+ the constructor returns.
*/
- explicit WaitableEvent (bool manualReset = false) noexcept;
+ explicit WaitableEvent (bool manualReset = false, bool initiallySignaled = false) noexcept;
/** Destructor.
diff --git a/Subtrees/beast/modules/beast_db/keyvalue/beast_KeyvaDB.cpp b/Subtrees/beast/modules/beast_db/keyvalue/beast_KeyvaDB.cpp
index 2fff06655..665a82a24 100644
--- a/Subtrees/beast/modules/beast_db/keyvalue/beast_KeyvaDB.cpp
+++ b/Subtrees/beast/modules/beast_db/keyvalue/beast_KeyvaDB.cpp
@@ -708,7 +708,7 @@ public:
maxPayloadBytes = 8 * 1024
};
- KeyvaDBTests () : UnitTest ("KeyvaDB", "ripple")
+ KeyvaDBTests () : UnitTest ("KeyvaDB", "beast")
{
}
diff --git a/TODO.txt b/TODO.txt
index 99d3f9286..35b260a81 100644
--- a/TODO.txt
+++ b/TODO.txt
@@ -5,10 +5,10 @@ RIPPLE TODO
Vinnie's List: Changes day to day, descending priority
(Items marked '*' can be handled by others.)
+- Allow skipped/disabled unit tests and reporting.
- Show summary for text output of unit test results
- Make ProofOfWorkTests manual since they aren't used
-* Make everyone check GitHub Issues every day
- Do something about the throw() reporting weaknesses:
* Make sure all Sconstruct and .pro builds have debug symbols in release
* Replace all throw with beast::Throw()
@@ -40,9 +40,23 @@ David Feature:
--------------------------------------------------------------------------------
+- Consolidate databases
+
+- Figure out why we need WAL sqlite mode if we no longer use sqlite for the node store
+
- Add "skipped" field to beginTestCase() to disable a test but still record
that it was skipped in the output. Like for mdb import.
+- use beast DeadlineTimer for sweep in Application
+
+- Make SNTP Client have its own io_service
+
+- Get rid of 'ref' typedefs that really mean const&
+
+- Use secp256k1 from beast
+
+- Fix xsd/dtd line in JUnit XML output
+
- Get rid of the WriteLog() stuff in the ripple tests and make it report the
message directly to the UnitTest object. Then update the JUnit XML output
routines to also write the auxiliary messages.
diff --git a/modules/ripple_app/data/ripple_SqliteDatabase.cpp b/modules/ripple_app/data/ripple_SqliteDatabase.cpp
index 2ffcb47e9..e88e40dd6 100644
--- a/modules/ripple_app/data/ripple_SqliteDatabase.cpp
+++ b/modules/ripple_app/data/ripple_SqliteDatabase.cpp
@@ -33,8 +33,11 @@ SqliteStatement::~SqliteStatement ()
sqlite3_finalize (statement);
}
+//------------------------------------------------------------------------------
+
SqliteDatabase::SqliteDatabase (const char* host)
: Database (host)
+ , m_thread ("sqlitedb")
, mWalQ (NULL)
, walRunning (false)
{
@@ -290,9 +293,13 @@ void SqliteDatabase::doHook (const char* db, int pages)
}
if (mWalQ)
+ {
mWalQ->addJob (jtWAL, std::string ("WAL:") + mHost, BIND_TYPE (&SqliteDatabase::runWal, this));
+ }
else
- boost::thread (BIND_TYPE (&SqliteDatabase::runWal, this)).detach ();
+ {
+ m_thread.call (&SqliteDatabase::runWal, this);
+ }
}
void SqliteDatabase::runWal ()
diff --git a/modules/ripple_app/data/ripple_SqliteDatabase.h b/modules/ripple_app/data/ripple_SqliteDatabase.h
index a1bac93b5..29b4aed71 100644
--- a/modules/ripple_app/data/ripple_SqliteDatabase.h
+++ b/modules/ripple_app/data/ripple_SqliteDatabase.h
@@ -57,6 +57,8 @@ public:
int getKBUsedAll ();
private:
+ ThreadWithCallQueue m_thread;
+
sqlite3* mConnection;
// VFALCO TODO Why do we need an "aux" connection? Should just use a second SqliteDatabase object.
sqlite3* mAuxConnection;
diff --git a/modules/ripple_app/main/ripple_Application.cpp b/modules/ripple_app/main/ripple_Application.cpp
index abfd68c3f..d1ea135c8 100644
--- a/modules/ripple_app/main/ripple_Application.cpp
+++ b/modules/ripple_app/main/ripple_Application.cpp
@@ -20,6 +20,118 @@ class ApplicationImp
, LeakChecked
{
public:
+ // RAII container for a boost::asio::io_service run by beast threads
+ class IoServiceThread
+ {
+ public:
+ IoServiceThread (String const& name,
+ int expectedConcurrency,
+ int numberOfExtraThreads = 0)
+ : m_name (name)
+ , m_service (expectedConcurrency)
+ , m_work (m_service)
+ {
+ m_threads.ensureStorageAllocated (numberOfExtraThreads);
+
+ for (int i = 0; i < numberOfExtraThreads; ++i)
+ m_threads.add (new ServiceThread (m_name, m_service));
+ }
+
+ ~IoServiceThread ()
+ {
+ m_service.stop ();
+
+ // the dtor of m_threads will block until each thread exits.
+ }
+
+ // TEMPORARY HACK for compatibility with old code
+ void runExtraThreads ()
+ {
+ for (int i = 0; i < m_threads.size (); ++i)
+ m_threads [i]->start ();
+ }
+
+ // Run on the callers thread.
+ // This will block until stop is issued.
+ void run ()
+ {
+ Thread const* const currentThread (Thread::getCurrentThread());
+
+ String previousThreadName;
+
+ if (currentThread != nullptr)
+ {
+ previousThreadName = currentThread->getThreadName ();
+ }
+ else
+ {
+ // we're on the main thread
+ previousThreadName = "main"; // for vanity
+ }
+
+ Thread::setCurrentThreadName (m_name);
+
+ m_service.run ();
+
+ Thread::setCurrentThreadName (previousThreadName);
+ }
+
+ void stop ()
+ {
+ m_service.stop ();
+ }
+
+ boost::asio::io_service& getService ()
+ {
+ return m_service;
+ }
+
+ operator boost::asio::io_service& ()
+ {
+ return m_service;
+ }
+
+ private:
+ class ServiceThread : Thread
+ {
+ public:
+ explicit ServiceThread (String const& name, boost::asio::io_service& service)
+ : Thread (name)
+ , m_service (service)
+ {
+ //startThread ();
+ }
+
+ ~ServiceThread ()
+ {
+ m_service.stop ();
+
+ stopThread (-1); // wait forever
+ }
+
+ void start ()
+ {
+ startThread ();
+ }
+
+ void run ()
+ {
+ m_service.run ();
+ }
+
+ private:
+ boost::asio::io_service& m_service;
+ };
+
+ private:
+ String const m_name;
+ boost::asio::io_service m_service;
+ boost::asio::io_service::work m_work;
+ OwnedArray m_threads;
+ };
+
+ //--------------------------------------------------------------------------
+
static ApplicationImp* createInstance ()
{
return new ApplicationImp;
@@ -40,14 +152,15 @@ public:
//
: SharedSingleton (SingletonLifetime::neverDestroyed)
#endif
- , mIOService ((getConfig ().NODE_SIZE >= 2) ? 2 : 1)
- , mIOWork (mIOService)
+ , m_mainService ("io",
+ (getConfig ().NODE_SIZE >= 2) ? 2 : 1,
+ (getConfig ().NODE_SIZE >= 2) ? 1 : 0)
+ , m_auxService ("auxio", 1, 1)
, mNetOps (new NetworkOPs (&mLedgerMaster))
, m_rpcServerHandler (*mNetOps)
, mTempNodeCache ("NodeCache", 16384, 90)
, mSLECache ("LedgerEntryCache", 4096, 120)
- , mSNTPClient (mAuxService)
- , mJobQueue (mIOService)
+ , mSNTPClient (m_auxService)
// VFALCO New stuff
, m_nodeStore (NodeStore::New (
getConfig ().nodeDatabase,
@@ -61,7 +174,7 @@ public:
, mValidations (IValidations::New ())
, mUNL (UniqueNodeList::New ())
, mProofOfWorkFactory (IProofOfWorkFactory::New ())
- , mPeers (IPeers::New (mIOService))
+ , mPeers (IPeers::New (m_mainService))
, m_loadManager (ILoadManager::New ())
// VFALCO End new stuff
// VFALCO TODO replace all NULL with nullptr
@@ -73,7 +186,7 @@ public:
, mRPCDoor (NULL)
, mWSPublicDoor (NULL)
, mWSPrivateDoor (NULL)
- , mSweepTimer (mAuxService)
+ , mSweepTimer (m_auxService)
, mShutdown (false)
{
// VFALCO TODO remove these once the call is thread safe.
@@ -119,7 +232,7 @@ public:
boost::asio::io_service& getIOService ()
{
- return mIOService;
+ return m_mainService;
}
LedgerMaster& getLedgerMaster ()
@@ -258,7 +371,266 @@ public:
{
return mShutdown;
}
- void setup ();
+
+ //--------------------------------------------------------------------------
+
+ static DatabaseCon* openDatabaseCon (const char* fileName,
+ const char* dbInit[],
+ int dbCount)
+ {
+ return new DatabaseCon (fileName, dbInit, dbCount);
+ }
+
+ void initSqliteDb (int index)
+ {
+ switch (index)
+ {
+ case 0: mRpcDB = openDatabaseCon ("rpc.db", RpcDBInit, RpcDBCount); break;
+ case 1: mTxnDB = openDatabaseCon ("transaction.db", TxnDBInit, TxnDBCount); break;
+ case 2: mLedgerDB = openDatabaseCon ("ledger.db", LedgerDBInit, LedgerDBCount); break;
+ case 3: mWalletDB = openDatabaseCon ("wallet.db", WalletDBInit, WalletDBCount); break;
+ };
+ }
+
+ // VFALCO TODO Is it really necessary to init the dbs in parallel?
+ void initSqliteDbs ()
+ {
+ int const count = 4;
+
+ ThreadGroup threadGroup (count);
+ ParallelFor (threadGroup).loop (count, &ApplicationImp::initSqliteDb, this);
+ }
+
+#ifdef SIGINT
+ static void sigIntHandler (int)
+ {
+ doShutdown = true;
+ }
+#endif
+
+ // VFALCO TODO Break this function up into many small initialization segments.
+ // Or better yet refactor these initializations into RAII classes
+ // which are members of the Application object.
+ //
+ void setup ()
+ {
+ // VFALCO NOTE: 0 means use heuristics to determine the thread count.
+ mJobQueue.setThreadCount (0, getConfig ().RUN_STANDALONE);
+
+ mSweepTimer.expires_from_now (boost::posix_time::seconds (10));
+ mSweepTimer.async_wait (BIND_TYPE (&ApplicationImp::sweep, this));
+
+ m_loadManager->startThread ();
+
+ #if ! BEAST_WIN32
+ #ifdef SIGINT
+
+ if (!getConfig ().RUN_STANDALONE)
+ {
+ struct sigaction sa;
+ memset (&sa, 0, sizeof (sa));
+ sa.sa_handler = &ApplicationImp::sigIntHandler;
+ sigaction (SIGINT, &sa, NULL);
+ }
+
+ #endif
+ #endif
+
+ assert (mTxnDB == NULL);
+
+ if (!getConfig ().DEBUG_LOGFILE.empty ())
+ {
+ // Let debug messages go to the file but only WARNING or higher to regular output (unless verbose)
+ Log::setLogFile (getConfig ().DEBUG_LOGFILE);
+
+ if (Log::getMinSeverity () > lsDEBUG)
+ LogPartition::setSeverity (lsDEBUG);
+ }
+
+ if (!getConfig ().RUN_STANDALONE)
+ mSNTPClient.init (getConfig ().SNTP_SERVERS);
+
+ initSqliteDbs ();
+
+ leveldb::Options options;
+ options.create_if_missing = true;
+ options.block_cache = leveldb::NewLRUCache (getConfig ().getSize (siHashNodeDBCache) * 1024 * 1024);
+
+ getApp().getLedgerDB ()->getDB ()->executeSQL (boost::str (boost::format ("PRAGMA cache_size=-%d;") %
+ (getConfig ().getSize (siLgrDBCache) * 1024)));
+ getApp().getTxnDB ()->getDB ()->executeSQL (boost::str (boost::format ("PRAGMA cache_size=-%d;") %
+ (getConfig ().getSize (siTxnDBCache) * 1024)));
+
+ mTxnDB->getDB ()->setupCheckpointing (&mJobQueue);
+ mLedgerDB->getDB ()->setupCheckpointing (&mJobQueue);
+
+ if (!getConfig ().RUN_STANDALONE)
+ updateTables ();
+
+ mFeatures->addInitialFeatures ();
+
+ if (getConfig ().START_UP == Config::FRESH)
+ {
+ WriteLog (lsINFO, Application) << "Starting new Ledger";
+
+ startNewLedger ();
+ }
+ else if ((getConfig ().START_UP == Config::LOAD) || (getConfig ().START_UP == Config::REPLAY))
+ {
+ WriteLog (lsINFO, Application) << "Loading specified Ledger";
+
+ if (!loadOldLedger (getConfig ().START_LEDGER, getConfig ().START_UP == Config::REPLAY))
+ {
+ getApp().stop ();
+ exit (-1);
+ }
+ }
+ else if (getConfig ().START_UP == Config::NETWORK)
+ {
+ // This should probably become the default once we have a stable network
+ if (!getConfig ().RUN_STANDALONE)
+ mNetOps->needNetworkLedger ();
+
+ startNewLedger ();
+ }
+ else
+ startNewLedger ();
+
+ mOrderBookDB.setup (getApp().getLedgerMaster ().getCurrentLedger ());
+
+ //
+ // Begin validation and ip maintenance.
+ // - LocalCredentials maintains local information: including identity and network connection persistence information.
+ //
+ m_localCredentials.start ();
+
+ //
+ // Set up UNL.
+ //
+ if (!getConfig ().RUN_STANDALONE)
+ getUNL ().nodeBootstrap ();
+
+ mValidations->tune (getConfig ().getSize (siValidationsSize), getConfig ().getSize (siValidationsAge));
+ m_nodeStore->tune (getConfig ().getSize (siNodeCacheSize), getConfig ().getSize (siNodeCacheAge));
+ mLedgerMaster.tune (getConfig ().getSize (siLedgerSize), getConfig ().getSize (siLedgerAge));
+ mSLECache.setTargetSize (getConfig ().getSize (siSLECacheSize));
+ mSLECache.setTargetAge (getConfig ().getSize (siSLECacheAge));
+
+ mLedgerMaster.setMinValidations (getConfig ().VALIDATION_QUORUM);
+
+ //
+ // Allow peer connections.
+ //
+ if (!getConfig ().RUN_STANDALONE)
+ {
+ try
+ {
+ mPeerDoor = PeerDoor::New (
+ getConfig ().PEER_IP,
+ getConfig ().PEER_PORT,
+ getConfig ().PEER_SSL_CIPHER_LIST,
+ m_mainService);
+ }
+ catch (const std::exception& e)
+ {
+ // Must run as directed or exit.
+ WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open peer service: %s") % e.what ());
+
+ exit (3);
+ }
+ }
+ else
+ {
+ WriteLog (lsINFO, Application) << "Peer interface: disabled";
+ }
+
+ //
+ // Allow RPC connections.
+ //
+ if (! getConfig ().getRpcIP().empty () && getConfig ().getRpcPort() != 0)
+ {
+ try
+ {
+ mRPCDoor = new RPCDoor (m_mainService, m_rpcServerHandler);
+ }
+ catch (const std::exception& e)
+ {
+ // Must run as directed or exit.
+ WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open RPC service: %s") % e.what ());
+
+ exit (3);
+ }
+ }
+ else
+ {
+ WriteLog (lsINFO, Application) << "RPC interface: disabled";
+ }
+
+ //
+ // Allow private WS connections.
+ //
+ if (!getConfig ().WEBSOCKET_IP.empty () && getConfig ().WEBSOCKET_PORT)
+ {
+ try
+ {
+ mWSPrivateDoor = new WSDoor (getConfig ().WEBSOCKET_IP, getConfig ().WEBSOCKET_PORT, false);
+ }
+ catch (const std::exception& e)
+ {
+ // Must run as directed or exit.
+ WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open private websocket service: %s") % e.what ());
+
+ exit (3);
+ }
+ }
+ else
+ {
+ WriteLog (lsINFO, Application) << "WS private interface: disabled";
+ }
+
+ //
+ // Allow public WS connections.
+ //
+ if (!getConfig ().WEBSOCKET_PUBLIC_IP.empty () && getConfig ().WEBSOCKET_PUBLIC_PORT)
+ {
+ try
+ {
+ mWSPublicDoor = new WSDoor (getConfig ().WEBSOCKET_PUBLIC_IP, getConfig ().WEBSOCKET_PUBLIC_PORT, true);
+ }
+ catch (const std::exception& e)
+ {
+ // Must run as directed or exit.
+ WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open public websocket service: %s") % e.what ());
+
+ exit (3);
+ }
+ }
+ else
+ {
+ WriteLog (lsINFO, Application) << "WS public interface: disabled";
+ }
+
+ //
+ // Begin connecting to network.
+ //
+ if (!getConfig ().RUN_STANDALONE)
+ mPeers->start ();
+
+ if (getConfig ().RUN_STANDALONE)
+ {
+ WriteLog (lsWARNING, Application) << "Running in standalone mode";
+
+ mNetOps->setStandAlone ();
+ }
+ else
+ {
+ // VFALCO NOTE the state timer resets the deadlock detector.
+ //
+ mNetOps->setStateTimer ();
+ }
+ }
+
+
void run ();
void stop ();
void sweep ();
@@ -270,13 +642,12 @@ private:
bool loadOldLedger (const std::string&, bool);
private:
- boost::asio::io_service mIOService;
- boost::asio::io_service mAuxService;
- // The lifetime of the io_service::work object informs the io_service
- // of when the work starts and finishes. io_service::run() will not exit
- // while the work object exists.
- //
- boost::asio::io_service::work mIOWork;
+ IoServiceThread m_mainService;
+ IoServiceThread m_auxService;
+
+ //boost::asio::io_service mIOService;
+ //boost::asio::io_service mAuxService;
+ //boost::asio::io_service::work mIOWork;
MasterLockType mMasterLock;
@@ -330,281 +701,24 @@ void ApplicationImp::stop ()
WriteLog (lsINFO, Application) << "Received shutdown request";
StopSustain ();
mShutdown = true;
- mIOService.stop ();
+ m_mainService.stop ();
m_nodeStore = nullptr;
mValidations->flush ();
- mAuxService.stop ();
+ m_auxService.stop ();
mJobQueue.shutdown ();
- WriteLog (lsINFO, Application) << "Stopped: " << mIOService.stopped ();
+ //WriteLog (lsINFO, Application) << "Stopped: " << mIOService.stopped
+
mShutdown = false;
}
-static void InitDB (DatabaseCon** dbCon, const char* fileName, const char* dbInit[], int dbCount)
-{
- *dbCon = new DatabaseCon (fileName, dbInit, dbCount);
-}
-
-#ifdef SIGINT
-void sigIntHandler (int)
-{
- doShutdown = true;
-}
-#endif
-
-// VFALCO TODO Figure this out it looks like the wrong tool
-static void runAux (boost::asio::io_service& svc)
-{
- setCallingThreadName ("aux");
- svc.run ();
-}
-
-static void runIO (boost::asio::io_service& io)
-{
- setCallingThreadName ("io");
- io.run ();
-}
-
-// VFALCO TODO Break this function up into many small initialization segments.
-// Or better yet refactor these initializations into RAII classes
-// which are members of the Application object.
-//
-void ApplicationImp::setup ()
-{
- // VFALCO NOTE: 0 means use heuristics to determine the thread count.
- mJobQueue.setThreadCount (0, getConfig ().RUN_STANDALONE);
-
- mSweepTimer.expires_from_now (boost::posix_time::seconds (10));
- mSweepTimer.async_wait (BIND_TYPE (&ApplicationImp::sweep, this));
-
- m_loadManager->startThread ();
-
-#if ! BEAST_WIN32
-#ifdef SIGINT
-
- if (!getConfig ().RUN_STANDALONE)
- {
- struct sigaction sa;
- memset (&sa, 0, sizeof (sa));
- sa.sa_handler = sigIntHandler;
- sigaction (SIGINT, &sa, NULL);
- }
-
-#endif
-#endif
-
- assert (mTxnDB == NULL);
-
- if (!getConfig ().DEBUG_LOGFILE.empty ())
- {
- // Let debug messages go to the file but only WARNING or higher to regular output (unless verbose)
- Log::setLogFile (getConfig ().DEBUG_LOGFILE);
-
- if (Log::getMinSeverity () > lsDEBUG)
- LogPartition::setSeverity (lsDEBUG);
- }
-
- boost::thread (BIND_TYPE (runAux, boost::ref (mAuxService))).detach ();
-
- if (!getConfig ().RUN_STANDALONE)
- mSNTPClient.init (getConfig ().SNTP_SERVERS);
-
- //
- // Construct databases.
- //
- boost::thread t1 (BIND_TYPE (&InitDB, &mRpcDB, "rpc.db", RpcDBInit, RpcDBCount));
- boost::thread t2 (BIND_TYPE (&InitDB, &mTxnDB, "transaction.db", TxnDBInit, TxnDBCount));
- boost::thread t3 (BIND_TYPE (&InitDB, &mLedgerDB, "ledger.db", LedgerDBInit, LedgerDBCount));
- boost::thread t4 (BIND_TYPE (&InitDB, &mWalletDB, "wallet.db", WalletDBInit, WalletDBCount));
- t1.join ();
- t2.join ();
- t3.join ();
- t4.join ();
-
- leveldb::Options options;
- options.create_if_missing = true;
- options.block_cache = leveldb::NewLRUCache (getConfig ().getSize (siHashNodeDBCache) * 1024 * 1024);
-
- getApp().getLedgerDB ()->getDB ()->executeSQL (boost::str (boost::format ("PRAGMA cache_size=-%d;") %
- (getConfig ().getSize (siLgrDBCache) * 1024)));
- getApp().getTxnDB ()->getDB ()->executeSQL (boost::str (boost::format ("PRAGMA cache_size=-%d;") %
- (getConfig ().getSize (siTxnDBCache) * 1024)));
-
- mTxnDB->getDB ()->setupCheckpointing (&mJobQueue);
- mLedgerDB->getDB ()->setupCheckpointing (&mJobQueue);
-
- if (!getConfig ().RUN_STANDALONE)
- updateTables ();
-
- mFeatures->addInitialFeatures ();
-
- if (getConfig ().START_UP == Config::FRESH)
- {
- WriteLog (lsINFO, Application) << "Starting new Ledger";
-
- startNewLedger ();
- }
- else if ((getConfig ().START_UP == Config::LOAD) || (getConfig ().START_UP == Config::REPLAY))
- {
- WriteLog (lsINFO, Application) << "Loading specified Ledger";
-
- if (!loadOldLedger (getConfig ().START_LEDGER, getConfig ().START_UP == Config::REPLAY))
- {
- getApp().stop ();
- exit (-1);
- }
- }
- else if (getConfig ().START_UP == Config::NETWORK)
- {
- // This should probably become the default once we have a stable network
- if (!getConfig ().RUN_STANDALONE)
- mNetOps->needNetworkLedger ();
-
- startNewLedger ();
- }
- else
- startNewLedger ();
-
- mOrderBookDB.setup (getApp().getLedgerMaster ().getCurrentLedger ());
-
- //
- // Begin validation and ip maintenance.
- // - LocalCredentials maintains local information: including identity and network connection persistence information.
- //
- m_localCredentials.start ();
-
- //
- // Set up UNL.
- //
- if (!getConfig ().RUN_STANDALONE)
- getUNL ().nodeBootstrap ();
-
- mValidations->tune (getConfig ().getSize (siValidationsSize), getConfig ().getSize (siValidationsAge));
- m_nodeStore->tune (getConfig ().getSize (siNodeCacheSize), getConfig ().getSize (siNodeCacheAge));
- mLedgerMaster.tune (getConfig ().getSize (siLedgerSize), getConfig ().getSize (siLedgerAge));
- mSLECache.setTargetSize (getConfig ().getSize (siSLECacheSize));
- mSLECache.setTargetAge (getConfig ().getSize (siSLECacheAge));
-
- mLedgerMaster.setMinValidations (getConfig ().VALIDATION_QUORUM);
-
- //
- // Allow peer connections.
- //
- if (!getConfig ().RUN_STANDALONE)
- {
- try
- {
- mPeerDoor = PeerDoor::New (
- getConfig ().PEER_IP,
- getConfig ().PEER_PORT,
- getConfig ().PEER_SSL_CIPHER_LIST,
- mIOService);
- }
- catch (const std::exception& e)
- {
- // Must run as directed or exit.
- WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open peer service: %s") % e.what ());
-
- exit (3);
- }
- }
- else
- {
- WriteLog (lsINFO, Application) << "Peer interface: disabled";
- }
-
- //
- // Allow RPC connections.
- //
- if (! getConfig ().getRpcIP().empty () && getConfig ().getRpcPort() != 0)
- {
- try
- {
- mRPCDoor = new RPCDoor (mIOService, m_rpcServerHandler);
- }
- catch (const std::exception& e)
- {
- // Must run as directed or exit.
- WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open RPC service: %s") % e.what ());
-
- exit (3);
- }
- }
- else
- {
- WriteLog (lsINFO, Application) << "RPC interface: disabled";
- }
-
- //
- // Allow private WS connections.
- //
- if (!getConfig ().WEBSOCKET_IP.empty () && getConfig ().WEBSOCKET_PORT)
- {
- try
- {
- mWSPrivateDoor = WSDoor::createWSDoor (getConfig ().WEBSOCKET_IP, getConfig ().WEBSOCKET_PORT, false);
- }
- catch (const std::exception& e)
- {
- // Must run as directed or exit.
- WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open private websocket service: %s") % e.what ());
-
- exit (3);
- }
- }
- else
- {
- WriteLog (lsINFO, Application) << "WS private interface: disabled";
- }
-
- //
- // Allow public WS connections.
- //
- if (!getConfig ().WEBSOCKET_PUBLIC_IP.empty () && getConfig ().WEBSOCKET_PUBLIC_PORT)
- {
- try
- {
- mWSPublicDoor = WSDoor::createWSDoor (getConfig ().WEBSOCKET_PUBLIC_IP, getConfig ().WEBSOCKET_PUBLIC_PORT, true);
- }
- catch (const std::exception& e)
- {
- // Must run as directed or exit.
- WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open public websocket service: %s") % e.what ());
-
- exit (3);
- }
- }
- else
- {
- WriteLog (lsINFO, Application) << "WS public interface: disabled";
- }
-
- //
- // Begin connecting to network.
- //
- if (!getConfig ().RUN_STANDALONE)
- mPeers->start ();
-
- if (getConfig ().RUN_STANDALONE)
- {
- WriteLog (lsWARNING, Application) << "Running in standalone mode";
-
- mNetOps->setStandAlone ();
- }
- else
- {
- // VFALCO NOTE the state timer resets the deadlock detector.
- //
- mNetOps->setStateTimer ();
- }
-}
-
void ApplicationImp::run ()
{
- if (getConfig ().NODE_SIZE >= 2)
- {
- boost::thread (BIND_TYPE (runIO, boost::ref (mIOService))).detach ();
- }
+ // VFALCO TODO The unit tests crash if we try to
+ // run these threads in the IoService constructor
+ // so this hack makes them start later.
+ //
+ m_mainService.runExtraThreads ();
if (!getConfig ().RUN_STANDALONE)
{
@@ -614,7 +728,7 @@ void ApplicationImp::run ()
getApp().getLoadManager ().activateDeadlockDetector ();
}
- mIOService.run (); // This blocks
+ m_mainService.run (); // This blocks until the io_service is stopped.
if (mWSPublicDoor)
mWSPublicDoor->stop ();
diff --git a/modules/ripple_app/main/ripple_Main.cpp b/modules/ripple_app/main/ripple_Main.cpp
index 0753e05de..12ca64581 100644
--- a/modules/ripple_app/main/ripple_Main.cpp
+++ b/modules/ripple_app/main/ripple_Main.cpp
@@ -145,7 +145,20 @@ public:
{
if (m_shouldLog)
{
+#if BEAST_MSVC
+ if (beast_isRunningUnderDebugger ())
+ {
+ Logger::outputDebugString (message);
+ }
+ else
+ {
+ std::cout << message.toStdString () << std::endl;
+ }
+
+#else
std::cout << message.toStdString () << std::endl;
+
+#endif
}
}
diff --git a/modules/ripple_app/network/WSDoor.cpp b/modules/ripple_app/network/WSDoor.cpp
index ba6a3423e..7f1ebf134 100644
--- a/modules/ripple_app/network/WSDoor.cpp
+++ b/modules/ripple_app/network/WSDoor.cpp
@@ -21,9 +21,36 @@ SETUP_LOG (WSDoor)
//
// VFALCO NOTE NetworkOPs isn't used here...
//
-void WSDoor::startListening ()
+
+WSDoor::WSDoor (std::string const& strIp, int iPort, bool bPublic)
+ : Thread ("websocket")
+ , mPublic (bPublic)
+ , mIp (strIp)
+ , mPort (iPort)
{
- setCallingThreadName ("websocket");
+ startThread ();
+}
+
+WSDoor::~WSDoor ()
+{
+ {
+ CriticalSection::ScopedLockType lock (m_endpointLock);
+
+ if (m_endpoint != nullptr)
+ m_endpoint->stop ();
+ }
+
+ signalThreadShouldExit ();
+ waitForThreadToExit ();
+}
+
+void WSDoor::run ()
+{
+ WriteLog (lsINFO, WSDoor) << boost::str (boost::format ("Websocket: %s: Listening: %s %d ")
+ % (mPublic ? "Public" : "Private")
+ % mIp
+ % mPort);
+
// Generate a single SSL context for use by all connections.
boost::shared_ptr mCtx;
mCtx = boost::make_shared (boost::asio::ssl::context::sslv23);
@@ -35,11 +62,13 @@ void WSDoor::startListening ()
SSL_CTX_set_tmp_dh_callback (mCtx->native_handle (), handleTmpDh);
- // Construct a single handler for all requests.
websocketpp::server_autotls::handler::ptr handler (new WSServerHandler (mCtx, mPublic));
- // Construct a websocket server.
- mSEndpoint = new websocketpp::server_autotls (handler);
+ {
+ CriticalSection::ScopedLockType lock (m_endpointLock);
+
+ m_endpoint = new websocketpp::server_autotls (handler);
+ }
// mEndpoint->alog().unset_level(websocketpp::log::alevel::ALL);
// mEndpoint->elog().unset_level(websocketpp::log::elevel::ALL);
@@ -47,7 +76,7 @@ void WSDoor::startListening ()
// Call the main-event-loop of the websocket server.
try
{
- mSEndpoint->listen (
+ m_endpoint->listen (
boost::asio::ip::tcp::endpoint (
boost::asio::ip::address ().from_string (mIp), mPort));
}
@@ -60,7 +89,7 @@ void WSDoor::startListening ()
// https://github.com/zaphoyd/websocketpp/issues/98
try
{
- mSEndpoint->get_io_service ().run ();
+ m_endpoint->get_io_service ().run ();
break;
}
catch (websocketpp::exception& e)
@@ -70,32 +99,18 @@ void WSDoor::startListening ()
}
}
- delete mSEndpoint;
-}
-
-WSDoor* WSDoor::createWSDoor (const std::string& strIp, const int iPort, bool bPublic)
-{
- WSDoor* wdpResult = new WSDoor (strIp, iPort, bPublic);
-
- WriteLog (lsINFO, WSDoor) <<
- boost::str (boost::format ("Websocket: %s: Listening: %s %d ")
- % (bPublic ? "Public" : "Private")
- % strIp
- % iPort);
-
- wdpResult->mThread = new boost::thread (BIND_TYPE (&WSDoor::startListening, wdpResult));
-
- return wdpResult;
+ delete m_endpoint;
}
void WSDoor::stop ()
{
- if (mThread)
{
- if (mSEndpoint)
- mSEndpoint->stop ();
+ CriticalSection::ScopedLockType lock (m_endpointLock);
-
- mThread->join ();
+ if (m_endpoint != nullptr)
+ m_endpoint->stop ();
}
+
+ signalThreadShouldExit ();
+ waitForThreadToExit ();
}
diff --git a/modules/ripple_app/network/WSDoor.h b/modules/ripple_app/network/WSDoor.h
index 068cd61ce..39f64d0d3 100644
--- a/modules/ripple_app/network/WSDoor.h
+++ b/modules/ripple_app/network/WSDoor.h
@@ -7,28 +7,24 @@
#ifndef RIPPLE_WSDOOR_RIPPLEHEADER
#define RIPPLE_WSDOOR_RIPPLEHEADER
-class WSDoor : LeakChecked
+class WSDoor : protected Thread, LeakChecked
{
-private:
- websocketpp::server_autotls* mSEndpoint;
+public:
+ WSDoor (std::string const& strIp, int iPort, bool bPublic);
- boost::thread* mThread;
+ ~WSDoor ();
+
+ void stop ();
+
+private:
+ void run ();
+
+private:
+ ScopedPointer m_endpoint;
+ CriticalSection m_endpointLock;
bool mPublic;
std::string mIp;
int mPort;
-
- void startListening ();
-
-public:
-
- WSDoor (const std::string& strIp, int iPort, bool bPublic) : mSEndpoint (0), mThread (0), mPublic (bPublic), mIp (strIp), mPort (iPort)
- {
- ;
- }
-
- void stop ();
-
- static WSDoor* createWSDoor (const std::string& strIp, const int iPort, bool bPublic);
};
#endif
diff --git a/modules/ripple_app/ripple_app.cpp b/modules/ripple_app/ripple_app.cpp
index 11ff12915..f8c11546e 100644
--- a/modules/ripple_app/ripple_app.cpp
+++ b/modules/ripple_app/ripple_app.cpp
@@ -60,8 +60,6 @@
#include
#include
-//#include "../ripple_sqlite/ripple_sqlite.h" // for SqliteDatabase.cpp
-
#include "../ripple_core/ripple_core.h"
#include "beast/modules/beast_db/beast_db.h"
diff --git a/modules/ripple_core/functional/ripple_JobQueue.cpp b/modules/ripple_core/functional/ripple_JobQueue.cpp
index db80cacc7..c3cd05168 100644
--- a/modules/ripple_core/functional/ripple_JobQueue.cpp
+++ b/modules/ripple_core/functional/ripple_JobQueue.cpp
@@ -6,11 +6,9 @@
SETUP_LOG (JobQueue)
-JobQueue::JobQueue (boost::asio::io_service& svc)
- : mLastJob (0)
- , mThreadCount (0)
- , mShuttingDown (false)
- , mIOService (svc)
+JobQueue::JobQueue ()
+ : m_workers (*this, 0)
+ , mLastJob (0)
{
mJobLoads [ jtPUBOLDLEDGER ].setTargetLatency (10000, 15000);
mJobLoads [ jtVALIDATION_ut ].setTargetLatency (2000, 5000);
@@ -30,9 +28,13 @@ JobQueue::JobQueue (boost::asio::io_service& svc)
mJobLoads [ jtACCEPTLEDGER ].setTargetLatency (1000, 2500);
}
+JobQueue::~JobQueue ()
+{
+}
+
void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE& jobFunc)
{
- addLimitJob(type, name, 0, jobFunc);
+ addLimitJob (type, name, 0, jobFunc);
}
void JobQueue::addLimitJob (JobType type, const std::string& name, int limit, const FUNCTION_TYPE& jobFunc)
@@ -41,21 +43,26 @@ void JobQueue::addLimitJob (JobType type, const std::string& name, int limit, co
boost::mutex::scoped_lock sl (mJobLock);
- if (type != jtCLIENT) // FIXME: Workaround incorrect client shutdown ordering
- assert (mThreadCount != 0); // do not add jobs to a queue with no threads
+ // FIXME: Workaround incorrect client shutdown ordering
+ // do not add jobs to a queue with no threads
+ bassert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0);
std::pair< std::set ::iterator, bool > it =
mJobSet.insert (Job (type, name, limit, ++mLastJob, mJobLoads[type], jobFunc));
+
it.first->peekEvent().start(); // start timing how long it stays in the queue
+
++mJobCounts[type].first;
- mJobCond.notify_one ();
+
+ m_workers.addTask ();
}
int JobQueue::getJobCount (JobType t)
{
boost::mutex::scoped_lock sl (mJobLock);
- std::map< JobType, std::pair >::iterator c = mJobCounts.find (t);
+ JobCounts::iterator c = mJobCounts.find (t);
+
return (c == mJobCounts.end ()) ? 0 : c->second.first;
}
@@ -63,7 +70,8 @@ int JobQueue::getJobCountTotal (JobType t)
{
boost::mutex::scoped_lock sl (mJobLock);
- std::map< JobType, std::pair >::iterator c = mJobCounts.find (t);
+ JobCounts::iterator c = mJobCounts.find (t);
+
return (c == mJobCounts.end ()) ? 0 : (c->second.first + c->second.second);
}
@@ -74,11 +82,13 @@ int JobQueue::getJobCountGE (JobType t)
boost::mutex::scoped_lock sl (mJobLock);
- typedef std::map< JobType, std::pair >::value_type jt_int_pair;
- BOOST_FOREACH (const jt_int_pair & it, mJobCounts)
+ typedef JobCounts::value_type jt_int_pair;
- if (it.first >= t)
- ret += it.second.first;
+ BOOST_FOREACH (jt_int_pair const& it, mJobCounts)
+ {
+ if (it.first >= t)
+ ret += it.second.first;
+ }
return ret;
}
@@ -89,11 +99,15 @@ std::vector< std::pair > > JobQueue::getJobCounts (
std::vector< std::pair > > ret;
boost::mutex::scoped_lock sl (mJobLock);
+
ret.reserve (mJobCounts.size ());
- typedef std::map< JobType, std::pair >::value_type jt_int_pair;
+ typedef JobCounts::value_type jt_int_pair;
+
BOOST_FOREACH (const jt_int_pair & it, mJobCounts)
- ret.push_back (it);
+ {
+ ret.push_back (it);
+ }
return ret;
}
@@ -101,9 +115,10 @@ std::vector< std::pair > > JobQueue::getJobCounts (
Json::Value JobQueue::getJson (int)
{
Json::Value ret (Json::objectValue);
+
boost::mutex::scoped_lock sl (mJobLock);
- ret["threads"] = mThreadCount;
+ ret["threads"] = m_workers.getNumberOfThreads ();
Json::Value priorities = Json::arrayValue;
@@ -116,7 +131,7 @@ Json::Value JobQueue::getJson (int)
int jobCount, threadCount;
bool isOver;
mJobLoads[i].getCountAndLatency (count, latencyAvg, latencyPeak, isOver);
- std::map< JobType, std::pair >::iterator it = mJobCounts.find (static_cast (i));
+ JobCounts::iterator it = mJobCounts.find (static_cast (i));
if (it == mJobCounts.end ())
{
@@ -165,6 +180,7 @@ Json::Value JobQueue::getJson (int)
bool JobQueue::isOverloaded ()
{
int count = 0;
+
boost::mutex::scoped_lock sl (mJobLock);
for (int i = 0; i < NUM_JOB_TYPES; ++i)
@@ -174,16 +190,13 @@ bool JobQueue::isOverloaded ()
return count > 0;
}
+// shut down the job queue without completing pending jobs
+//
void JobQueue::shutdown ()
{
- // shut down the job queue without completing pending jobs
WriteLog (lsINFO, JobQueue) << "Job queue shutting down";
- boost::mutex::scoped_lock sl (mJobLock);
- mShuttingDown = true;
- mJobCond.notify_all ();
- while (mThreadCount != 0)
- mJobCond.wait (sl);
+ m_workers.pauseAllThreadsAndWait ();
}
// set the number of thread serving the job queue to precisely this number
@@ -195,7 +208,7 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode)
}
else if (c == 0)
{
- c = boost::thread::hardware_concurrency ();
+ c = SystemStats::getNumCpus ();
// VFALCO NOTE According to boost, hardware_concurrency cannot return
// negative numbers/
@@ -210,113 +223,83 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode)
WriteLog (lsINFO, JobQueue) << "Auto-tuning to " << c << " validation/transaction/proposal threads";
}
- // VFALCO TODO Split the function up. The lower part actually does the "do",
- // The part above this comment figures out the value for numThreads
- //
- boost::mutex::scoped_lock sl (mJobLock);
-
- while (mJobCounts[jtDEATH].first != 0)
- {
- mJobCond.wait (sl);
- }
-
- while (mThreadCount < c)
- {
- ++mThreadCount;
- boost::thread (BIND_TYPE (&JobQueue::threadEntry, this)).detach ();
- }
-
- while (mThreadCount > c)
- {
- if (mJobCounts[jtDEATH].first != 0)
- {
- mJobCond.wait (sl);
- }
- else
- {
- mJobSet.insert (Job (jtDEATH, 0));
- ++ (mJobCounts[jtDEATH].first);
- }
- }
-
- mJobCond.notify_one (); // in case we sucked up someone else's signal
+ m_workers.setNumberOfThreads (c);
}
bool JobQueue::getJob(Job& job)
{
- if (mJobSet.empty() || mShuttingDown)
- return false;
+ bool gotJob = false;
- std::set::iterator it = mJobSet.begin ();
-
- while (1)
+ if (! mJobSet.empty ())
{
- // Are we out of jobs?
- if (it == mJobSet.end())
- return false;
+ std::set::iterator it = mJobSet.begin ();
- // Does this job have no limit?
- if (it->getLimit() == 0)
- break;
+ for (;;)
+ {
+ // VFALCO NOTE how can we be out of jobs if we just checked mJobSet.empty ()?
+ //
+ // Are we out of jobs?
+ if (it == mJobSet.end())
+ return false; // VFALCO TODO get rid of this return from the middle
- // Is this job category below the limit?
- if (mJobCounts[it->getType()].second < it->getLimit())
- break;
+ // Does this job have no limit?
+ if (it->getLimit() == 0)
+ break;
- // Try the next job, if any
- ++it;
+ // Is this job category below the limit?
+ if (mJobCounts[it->getType()].second < it->getLimit())
+ break;
+
+ // Try the next job, if any
+ ++it;
+ }
+
+ job = *it;
+ mJobSet.erase (it);
+
+ gotJob = true;
}
- job = *it;
- mJobSet.erase (it);
-
- return true;
+ return gotJob;
}
-// do jobs until asked to stop
-void JobQueue::threadEntry ()
+void JobQueue::processTask ()
{
- boost::mutex::scoped_lock sl (mJobLock);
-
- while (1)
{
- JobType type;
+ // This lock shouldn't be needed
+ boost::mutex::scoped_lock lock (mJobLock);
- setCallingThreadName ("waiting");
+ JobType type (jtINVALID);
{
Job job;
- while (!getJob(job))
+
+ bool const haveJob = getJob (job);
+
+ if (haveJob)
{
- if (mShuttingDown)
- {
- --mThreadCount;
- mJobCond.notify_all();
- return;
- }
- mJobCond.wait (sl);
+ type = job.getType ();
+
+ // VFALCO TODO Replace with Atomic <>
+ --(mJobCounts[type].first);
+ ++(mJobCounts[type].second);
+
+ lock.unlock ();
+
+ Thread::setCurrentThreadName (Job::toString (type));
+
+ WriteLog (lsTRACE, JobQueue) << "Doing " << Job::toString (type) << " job";
+
+ job.doJob ();
}
- type = job.getType ();
- -- (mJobCounts[type].first);
+ // must destroy job, here, without holding lock
+ }
- if (type == jtDEATH)
- {
- --mThreadCount;
- mJobCond.notify_all();
- return;
- }
-
- ++ (mJobCounts[type].second);
- sl.unlock ();
- setCallingThreadName (Job::toString (type));
- WriteLog (lsTRACE, JobQueue) << "Doing " << Job::toString (type) << " job";
- job.doJob ();
- } // must destroy job without holding lock
-
- sl.lock ();
- -- (mJobCounts[type].second);
+ if (type != jtINVALID)
+ {
+ lock.lock ();
+ -- (mJobCounts[type].second);
+ }
}
}
-
-// vim:ts=4
diff --git a/modules/ripple_core/functional/ripple_JobQueue.h b/modules/ripple_core/functional/ripple_JobQueue.h
index 30f877197..eb258d330 100644
--- a/modules/ripple_core/functional/ripple_JobQueue.h
+++ b/modules/ripple_core/functional/ripple_JobQueue.h
@@ -4,13 +4,17 @@
*/
//==============================================================================
-#ifndef RIPPLE_JOBQUEUE_H
-#define RIPPLE_JOBQUEUE_H
+#ifndef RIPPLE_JOBQUEUE_H_INCLUDED
+#define RIPPLE_JOBQUEUE_H_INCLUDED
-class JobQueue
+class JobQueue : private Workers::Callback
{
public:
- explicit JobQueue (boost::asio::io_service&);
+ typedef std::map > JobCounts;
+
+ JobQueue ();
+
+ ~JobQueue ();
// VFALCO TODO make convenience functions that allow the caller to not
// have to call bind.
@@ -46,22 +50,17 @@ public:
Json::Value getJson (int c = 0);
private:
- void threadEntry ();
-
- boost::mutex mJobLock;
- boost::condition_variable mJobCond;
-
- uint64 mLastJob;
- std::set mJobSet;
- LoadMonitor mJobLoads [NUM_JOB_TYPES];
- int mThreadCount;
- bool mShuttingDown;
-
- boost::asio::io_service& mIOService;
-
- std::map > mJobCounts;
-
bool getJob (Job& job);
+ void processTask ();
+
+private:
+ Workers m_workers;
+
+ boost::mutex mJobLock; // VFALCO TODO Replace with CriticalSection
+ uint64 mLastJob;
+ std::set mJobSet;
+ LoadMonitor mJobLoads [NUM_JOB_TYPES];
+ JobCounts mJobCounts;
};
#endif
diff --git a/rippled.1 b/rippled.1
deleted file mode 100644
index 8617000b8..000000000
Binary files a/rippled.1 and /dev/null differ