Merge branch 'develop' of github.com:ripple/rippled into develop

This commit is contained in:
JoelKatz
2013-07-29 14:18:17 -07:00
28 changed files with 1124 additions and 565 deletions

Binary file not shown.

View File

@@ -92,6 +92,7 @@
<ClInclude Include="..\..\modules\beast_basics\threads\beast_SerialFor.h" />
<ClInclude Include="..\..\modules\beast_basics\threads\beast_ThreadGroup.h" />
<ClInclude Include="..\..\modules\beast_basics\threads\beast_ThreadWithCallQueue.h" />
<ClInclude Include="..\..\modules\beast_basics\threads\beast_Workers.h" />
<ClInclude Include="..\..\modules\beast_core\beast_core.h" />
<ClInclude Include="..\..\modules\beast_core\containers\beast_AbstractFifo.h" />
<ClInclude Include="..\..\modules\beast_core\containers\beast_Array.h" />
@@ -352,6 +353,12 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\modules\beast_basics\threads\beast_Workers.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\modules\beast_core\beast_core.cpp" />
<ClCompile Include="..\..\modules\beast_core\containers\beast_AbstractFifo.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>

View File

@@ -728,6 +728,9 @@
<ClInclude Include="..\..\modules\beast_core\text\beast_LexicalCast.h">
<Filter>beast_core\text</Filter>
</ClInclude>
<ClInclude Include="..\..\modules\beast_basics\threads\beast_Workers.h">
<Filter>beast_basics\threads</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="..\..\modules\beast_core\beast_core.cpp">
@@ -1135,6 +1138,9 @@
<ClCompile Include="..\..\modules\beast_core\text\beast_LexicalCast.cpp">
<Filter>beast_core\text</Filter>
</ClCompile>
<ClCompile Include="..\..\modules\beast_basics\threads\beast_Workers.cpp">
<Filter>beast_basics\threads</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<Text Include="..\..\TODO.txt" />

View File

@@ -56,6 +56,7 @@ namespace beast
#include "threads/beast_ReadWriteMutex.cpp"
#include "threads/beast_ThreadGroup.cpp"
#include "threads/beast_ThreadWithCallQueue.cpp"
#include "threads/beast_Workers.cpp"
}

View File

@@ -268,6 +268,7 @@ namespace beast
#include "threads/beast_ManualCallQueue.h"
#include "threads/beast_ParallelFor.h"
#include "threads/beast_ThreadWithCallQueue.h"
#include "threads/beast_Workers.h"
}

View File

@@ -71,11 +71,6 @@ public:
*/
int getNumberOfThreads () const;
template <class F, class T1>
void operator () (int numberOfIterations, T1 t1)
{
}
/** Execute parallel for loop.
Functor is called once for each value in the range

View File

@@ -17,23 +17,6 @@
*/
//==============================================================================
Semaphore::WaitingThread::WaitingThread ()
: m_event (false) // auto-reset
{
}
void Semaphore::WaitingThread::wait ()
{
m_event.wait ();
}
void Semaphore::WaitingThread::signal ()
{
m_event.signal ();
}
//==============================================================================
Semaphore::Semaphore (int initialCount)
: m_counter (initialCount)
{
@@ -75,8 +58,10 @@ void Semaphore::signal (int amount)
}
}
void Semaphore::wait ()
bool Semaphore::wait (int timeOutMilliseconds)
{
bool signaled = true;
// Always prepare the WaitingThread object first, either
// from the delete list or through a new allocation.
//
@@ -107,11 +92,34 @@ void Semaphore::wait ()
if (waitingThread != nullptr)
{
// Yes so do it.
waitingThread->wait ();
signaled = waitingThread->wait (timeOutMilliseconds);
// If the wait is satisfied, then we've been taken off the
// waiting list so put waitingThread back in the delete list.
//
m_deleteList.push_front (waitingThread);
}
return signaled;
}
//------------------------------------------------------------------------------
Semaphore::WaitingThread::WaitingThread ()
: m_event (false) // auto-reset
{
}
bool Semaphore::WaitingThread::wait (int timeOutMilliseconds)
{
return m_event.wait (timeOutMilliseconds);
}
void Semaphore::WaitingThread::signal ()
{
m_event.signal ();
}
//------------------------------------------------------------------------------
// VFALCO TODO Unit Tests!

View File

@@ -49,8 +49,12 @@ public:
void signal (int amount = 1);
/** Wait for a resource.
A negative time-out value means that the method will wait indefinitely.
@returns true if the event has been signalled, false if the timeout expires.
*/
void wait ();
bool wait (int timeOutMilliseconds = -1);
private:
class WaitingThread
@@ -60,7 +64,7 @@ private:
public:
WaitingThread ();
void wait ();
bool wait (int timeOutMilliseconds);
void signal ();
private:

View File

@@ -0,0 +1,276 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
Workers::Workers (Callback& callback, int numberOfThreads)
: m_callback (callback)
, m_allPaused (true, true)
, m_semaphore (0)
, m_numberOfThreads (0)
{
setNumberOfThreads (numberOfThreads);
}
Workers::~Workers ()
{
pauseAllThreadsAndWait ();
deleteWorkers (m_everyone);
}
int Workers::getNumberOfThreads () const noexcept
{
return m_numberOfThreads;
}
// VFALCO NOTE if this function is called quickly to reduce then
// increase the number of threads, it could result in
// more paused threads being created than expected.
//
void Workers::setNumberOfThreads (int numberOfThreads)
{
if (m_numberOfThreads != numberOfThreads)
{
if (numberOfThreads > m_numberOfThreads)
{
// Increasing the number of working threads
int const amount = numberOfThreads - m_numberOfThreads;
for (int i = 0; i < amount; ++i)
{
// See if we can reuse a paused worker
Worker* worker = m_paused.pop_front ();
if (worker != nullptr)
{
// If we got here then the worker thread is at [1]
// This will unblock their call to wait()
//
worker->notify ();
}
else
{
worker = new Worker (*this);
}
m_everyone.push_front (worker);
}
}
else if (numberOfThreads < m_numberOfThreads)
{
// Decreasing the number of working threads
int const amount = m_numberOfThreads - numberOfThreads;
for (int i = 0; i < amount; ++i)
{
++m_pauseCount;
// Pausing a thread counts as one "internal task"
m_semaphore.signal ();
}
}
m_numberOfThreads = numberOfThreads;
}
}
void Workers::pauseAllThreadsAndWait ()
{
setNumberOfThreads (0);
m_allPaused.wait ();
}
void Workers::addTask ()
{
m_semaphore.signal ();
}
void Workers::deleteWorkers (LockFreeStack <Worker>& stack)
{
for (;;)
{
Worker* const worker = stack.pop_front ();
if (worker != nullptr)
{
// This call blocks until the thread orderly exits
delete worker;
}
else
{
break;
}
}
}
//------------------------------------------------------------------------------
Workers::Worker::Worker (Workers& workers)
: Thread ("Worker")
, m_workers (workers)
{
startThread ();
}
Workers::Worker::~Worker ()
{
stopThread ();
}
void Workers::Worker::run ()
{
while (! threadShouldExit ())
{
// Increment the count of active workers, and if
// we are the first one then reset the "all paused" event
//
if (++m_workers.m_activeCount == 1)
m_workers.m_allPaused.reset ();
for (;;)
{
// Acquire a task or "internal task."
//
m_workers.m_semaphore.wait ();
// See if there's a pause request. This
// counts as an "internal task."
//
int pauseCount = m_workers.m_pauseCount.get ();
if (pauseCount > 0)
{
// Try to decrement
pauseCount = --m_workers.m_pauseCount;
if (pauseCount >= 0)
{
// We got paused
break;
}
else
{
// Undo our decrement
++m_workers.m_pauseCount;
}
}
// We couldn't pause so we must have gotten
// unblocked in order to process a task.
//
m_workers.m_callback.processTask ();
}
// Any worker that goes into the paused list must
// guarantee that it will eventually block on its
// event object.
//
m_workers.m_paused.push_front (this);
// Decrement the count of active workers, and if we
// are the last one then signal the "all paused" event.
//
if (--m_workers.m_activeCount == 0)
m_workers.m_allPaused.signal ();
// [1] We will be here when the paused list is popped
//
// We block on our event object, a requirement of being
// put into the paused list.
//
// This will get signaled on either a reactivate or a stopThread()
//
wait ();
}
}
//------------------------------------------------------------------------------
class WorkersTests : public UnitTest
{
public:
WorkersTests () : UnitTest ("Workers", "beast")
{
}
struct TestCallback : Workers::Callback
{
explicit TestCallback (int count_)
: finished (false, count_ == 0)
, count (count_)
{
}
void processTask ()
{
if (--count == 0)
finished.signal ();
}
WaitableEvent finished;
Atomic <int> count;
};
void testThreads (int const threadCount)
{
String s;
s << "threadCount = " << String (threadCount);
beginTestCase (s);
TestCallback cb (threadCount);
Workers w (cb, 0);
expect (w.getNumberOfThreads () == 0);
w.setNumberOfThreads (threadCount);
expect (w.getNumberOfThreads () == threadCount);
for (int i = 0; i < threadCount; ++i)
w.addTask ();
// 10 seconds should be enough to finish on any system
//
bool signaled = cb.finished.wait (10 * 1000);
expect (signaled, "timed out");
w.pauseAllThreadsAndWait ();
int const count (cb.count.get ());
expectEquals (count, 0);
}
void runTest ()
{
testThreads (0);
testThreads (1);
testThreads (2);
testThreads (4);
testThreads (16);
testThreads (64);
testThreads (128);
testThreads (256);
testThreads (512);
}
};
static WorkersTests workersTests;

View File

@@ -0,0 +1,122 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef BEAST_WORKERS_H_INCLUDED
#define BEAST_WORKERS_H_INCLUDED
/** A group of threads that process tasks.
*/
class Workers
{
public:
/** Called to perform tasks as needed. */
struct Callback
{
/** Perform a task.
The call is made on a thread owned by Workers.
*/
virtual void processTask () = 0;
};
/** Create the object.
A number of initial threads may be optionally specified. The
default is to create one thread per CPU.
*/
explicit Workers (Callback& callback, int numberOfThreads = SystemStats::getNumCpus ());
~Workers ();
/** Retrieve the desired number of threads.
This just returns the number of active threads that were requested. If
there was a recent call to setNumberOfThreads, the actual number of active
threads may be temporarily different from what was last requested.
@note This function is not thread-safe.
*/
int getNumberOfThreads () const noexcept;
/** Set the desired number of threads.
@note This function is not thread-safe.
*/
void setNumberOfThreads (int numberOfThreads);
/** Pause all threads and wait until they are paused.
If a thread is processing a task it will pause as soon as the task
completes. There may still be tasks signaled even after all threads
have paused.
@note This function is not thread-safe.
*/
void pauseAllThreadsAndWait ();
/** Increment the number of tasks.
The callback will be called for each task.
@note This function is thread-safe.
*/
void addTask ();
//--------------------------------------------------------------------------
private:
struct PausedTag { };
/* A Worker executes tasks on its provided thread.
These are the states:
Active: Running the task processing loop.
Idle: Active, but blocked on waiting for a task.
Pausd: Blocked waiting to exit or become active.
*/
class Worker
: public LockFreeStack <Worker>::Node
, public LockFreeStack <Worker, PausedTag>::Node
, public Thread
{
public:
explicit Worker (Workers& workers);
~Worker ();
private:
void run ();
private:
Workers& m_workers;
};
private:
static void deleteWorkers (LockFreeStack <Worker>& stack);
private:
Callback& m_callback;
WaitableEvent m_allPaused; // signaled when all threads paused
Semaphore m_semaphore; // each pending task is 1 resource
int m_numberOfThreads; // how many we want active now
Atomic <int> m_activeCount; // to know when all are paused
Atomic <int> m_pauseCount; // how many threads need to pause now
LockFreeStack <Worker> m_everyone; // holds all created workers
LockFreeStack <Worker, PausedTag> m_paused; // holds just paused workers
};
#endif

View File

@@ -253,8 +253,8 @@ public:
/** Compares two values, and if they don't match, prints out a message containing the
expected and actual result values.
*/
template <class ValueType>
void expectEquals (ValueType actual, ValueType expected, String failureMessage = String::empty)
template <class ActualType, class ExpectedType>
void expectEquals (ActualType actual, ExpectedType expected, String failureMessage = String::empty)
{
const bool result = (actual == expected);

View File

@@ -54,7 +54,7 @@ void CriticalSection::exit() const noexcept
//==============================================================================
WaitableEvent::WaitableEvent (const bool useManualReset) noexcept
WaitableEvent::WaitableEvent (const bool useManualReset, bool initiallySignaled) noexcept
: triggered (false), manualReset (useManualReset)
{
pthread_cond_init (&condition, 0);
@@ -65,6 +65,9 @@ WaitableEvent::WaitableEvent (const bool useManualReset) noexcept
pthread_mutexattr_setprotocol (&atts, PTHREAD_PRIO_INHERIT);
#endif
pthread_mutex_init (&mutex, &atts);
if (initiallySignaled)
signal ();
}
WaitableEvent::~WaitableEvent() noexcept

View File

@@ -80,9 +80,11 @@ void CriticalSection::exit() const noexcept
}
//==============================================================================
WaitableEvent::WaitableEvent (const bool manualReset) noexcept
WaitableEvent::WaitableEvent (const bool manualReset, bool initiallySignaled) noexcept
: internal (CreateEvent (0, manualReset ? TRUE : FALSE, FALSE, 0))
{
if (initiallySignaled)
signal ();
}
WaitableEvent::~WaitableEvent() noexcept

View File

@@ -5,7 +5,7 @@
Portions of this file are from JUCE.
Copyright (c) 2013 - Raw Material Software Ltd.
Please visit http://www.juce.com
Please visit http://www.beast.com
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@@ -131,8 +131,8 @@ public:
if (start.getAddress() == nullptr || start.isEmpty())
return getEmpty();
const size_t numBytes = (size_t)( reinterpret_cast<const char*> (end.getAddress())
- reinterpret_cast<const char*> (start.getAddress()));
const size_t numBytes = (size_t) (reinterpret_cast<const char*> (end.getAddress())
- reinterpret_cast<const char*> (start.getAddress()));
const CharPointerType dest (createUninitialisedBytes (numBytes + sizeof (CharType)));
memcpy (dest.getAddress(), start, numBytes);
dest.getAddress()[numBytes / sizeof (CharType)] = 0;
@@ -358,88 +358,54 @@ String String::charToString (const beast_wchar character)
//==============================================================================
namespace NumberToStringConverters
{
template <typename Type>
static inline char* printDigits (char* t, Type v) noexcept
{
*--t = 0;
do
{
*--t = '0' + (char) (v % 10);
v /= 10;
} while (v > 0);
return t;
}
// pass in a pointer to the END of a buffer..
static char* numberToString (char* t, const int64 n) noexcept
{
if (n > 0)
{
*--t = 0;
uint64 v = static_cast <uint64> (n);
do
{
*--t = (char) ('0' + (int) (v % 10));
v /= 10;
}
while (v > 0);
}
else
{
*--t = 0;
uint64 v = ((uint64)(-(n + 1)) + 1);
do
{
*--t = (char) ('0' + (int) (v % 10));
v /= 10;
}
while (v > 0);
*--t = '-';
}
if (n >= 0)
return printDigits (t, static_cast<uint64> (n));
// NB: this needs to be careful not to call -std::numeric_limits<int64>::min(),
// which has undefined behaviour
t = printDigits (t, static_cast<uint64> (-(n + 1)) + 1);
*--t = '-';
return t;
}
static char* numberToString (char* t, uint64 v) noexcept
{
*--t = 0;
do
{
*--t = (char) ('0' + (int) (v % 10));
v /= 10;
} while (v > 0);
return t;
return printDigits (t, v);
}
static char* numberToString (char* t, const int n) noexcept
{
if (n == (int) 0x80000000) // (would cause an overflow)
return numberToString (t, (int64) n);
*--t = 0;
int v = abs (n);
do
{
*--t = (char) ('0' + (v % 10));
v /= 10;
} while (v > 0);
if (n < 0)
*--t = '-';
if (n >= 0)
return printDigits (t, static_cast<unsigned int> (n));
// NB: this needs to be careful not to call -std::numeric_limits<int>::min(),
// which has undefined behaviour
t = printDigits (t, static_cast<unsigned int> (-(n + 1)) + 1);
*--t = '-';
return t;
}
static char* numberToString (char* t, unsigned int v) noexcept
{
*--t = 0;
do
{
*--t = (char) ('0' + (v % 10));
v /= 10;
} while (v > 0);
return t;
return printDigits (t, v);
}
static char* doubleToString (char* buffer, const int numChars, double n, int numDecPlaces, size_t& len) noexcept
@@ -492,7 +458,6 @@ namespace NumberToStringConverters
char buffer [32];
char* const end = buffer + numElementsInArray (buffer);
char* const start = numberToString (end, number);
return StringHolder::createFromFixedLength (start, (size_t) (end - start - 1));
}
@@ -2094,7 +2059,9 @@ String String::fromUTF8 (const char* const buffer, int bufferSizeBytes)
class StringTests : public UnitTest
{
public:
StringTests() : UnitTest ("String", "beast") { }
StringTests() : UnitTest ("String", "beast")
{
}
template <class CharPointerType>
struct TestUTFConversion
@@ -2236,6 +2203,10 @@ public:
expect (String ((int64) -1234).getLargeIntValue() == -1234);
expect (String (-1234.56).getDoubleValue() == -1234.56);
expect (String (-1234.56f).getFloatValue() == -1234.56f);
expect (String (std::numeric_limits<int>::max()).getIntValue() == std::numeric_limits<int>::max());
expect (String (std::numeric_limits<int>::min()).getIntValue() == std::numeric_limits<int>::min());
expect (String (std::numeric_limits<int64>::max()).getLargeIntValue() == std::numeric_limits<int64>::max());
expect (String (std::numeric_limits<int64>::min()).getLargeIntValue() == std::numeric_limits<int64>::min());
expect (("xyz" + s).getTrailingIntValue() == s.getIntValue());
expect (s.getHexValue32() == 0x12345678);
expect (s.getHexValue64() == (int64) 0x12345678);
@@ -2243,6 +2214,24 @@ public:
expect (String::toHexString ((int64) 0x1234abcd).equalsIgnoreCase ("1234abcd"));
expect (String::toHexString ((short) 0x12ab).equalsIgnoreCase ("12ab"));
expectEquals (String (int (0)), "0");
expectEquals (String (short (0)), "0");
expectEquals (String (int64 (0)), "0");
expectEquals (String ((unsigned int) 0), "0");
expectEquals (String ((unsigned short) 0), "0");
expectEquals (String (uint64 (0)), "0");
expectEquals (String (int (-1)), "-1");
expectEquals (String (short (-1)), "-1");
expectEquals (String (int64 (-1)), "-1");
expectEquals (String (int (1)), "1");
expectEquals (String (short (1)), "1");
expectEquals (String (int64 (1)), "1");
expectEquals (String ((unsigned int) 1), "1");
expectEquals (String ((unsigned short) 1), "1");
expectEquals (String (uint64 (1)), "1");
unsigned char data[] = { 1, 2, 3, 4, 0xa, 0xb, 0xc, 0xd };
expect (String::toHexString (data, 8, 0).equalsIgnoreCase ("010203040a0b0c0d"));
expect (String::toHexString (data, 8, 1).equalsIgnoreCase ("01 02 03 04 0a 0b 0c 0d"));
@@ -2418,4 +2407,5 @@ public:
}
};
static StringTests stringTests;
static StringTests stringUnitTests;

View File

@@ -113,7 +113,7 @@ public:
value in here will wait forever.
@see signalThreadShouldExit, threadShouldExit, waitForThreadToExit, isThreadRunning
*/
void stopThread (int timeOutMilliseconds);
void stopThread (int timeOutMilliseconds = -1);
//==============================================================================
/** Returns true if the thread is currently active */
@@ -150,7 +150,7 @@ public:
is less than zero, it will wait forever.
@returns true if the thread exits, or false if the timeout expires first.
*/
bool waitForThreadToExit (int timeOutMilliseconds) const;
bool waitForThreadToExit (int timeOutMilliseconds = -1) const;
//==============================================================================
/** Changes the thread's priority.
@@ -205,7 +205,7 @@ public:
@returns true if the event has been signalled, false if the timeout expires.
*/
bool wait (int timeOutMilliseconds) const;
bool wait (int timeOutMilliseconds = -1) const;
/** Wakes up the thread.

View File

@@ -44,8 +44,11 @@ public:
@param manualReset If this is false, the event will be reset automatically when the wait()
method is called. If manualReset is true, then once the event is signalled,
the only way to reset it will be by calling the reset() method.
@param initiallySignaled If this is true then the event will be signaled when
the constructor returns.
*/
explicit WaitableEvent (bool manualReset = false) noexcept;
explicit WaitableEvent (bool manualReset = false, bool initiallySignaled = false) noexcept;
/** Destructor.

View File

@@ -708,7 +708,7 @@ public:
maxPayloadBytes = 8 * 1024
};
KeyvaDBTests () : UnitTest ("KeyvaDB", "ripple")
KeyvaDBTests () : UnitTest ("KeyvaDB", "beast")
{
}

View File

@@ -5,10 +5,10 @@ RIPPLE TODO
Vinnie's List: Changes day to day, descending priority
(Items marked '*' can be handled by others.)
- Allow skipped/disabled unit tests and reporting.
- Show summary for text output of unit test results
- Make ProofOfWorkTests manual since they aren't used
* Make everyone check GitHub Issues every day
- Do something about the throw() reporting weaknesses:
* Make sure all Sconstruct and .pro builds have debug symbols in release
* Replace all throw with beast::Throw()
@@ -40,9 +40,23 @@ David Feature:
--------------------------------------------------------------------------------
- Consolidate databases
- Figure out why we need WAL sqlite mode if we no longer use sqlite for the node store
- Add "skipped" field to beginTestCase() to disable a test but still record
that it was skipped in the output. Like for mdb import.
- use beast DeadlineTimer for sweep in Application
- Make SNTP Client have its own io_service
- Get rid of 'ref' typedefs that really mean const&
- Use secp256k1 from beast
- Fix xsd/dtd line in JUnit XML output
- Get rid of the WriteLog() stuff in the ripple tests and make it report the
message directly to the UnitTest object. Then update the JUnit XML output
routines to also write the auxiliary messages.

View File

@@ -33,8 +33,11 @@ SqliteStatement::~SqliteStatement ()
sqlite3_finalize (statement);
}
//------------------------------------------------------------------------------
SqliteDatabase::SqliteDatabase (const char* host)
: Database (host)
, m_thread ("sqlitedb")
, mWalQ (NULL)
, walRunning (false)
{
@@ -290,9 +293,13 @@ void SqliteDatabase::doHook (const char* db, int pages)
}
if (mWalQ)
{
mWalQ->addJob (jtWAL, std::string ("WAL:") + mHost, BIND_TYPE (&SqliteDatabase::runWal, this));
}
else
boost::thread (BIND_TYPE (&SqliteDatabase::runWal, this)).detach ();
{
m_thread.call (&SqliteDatabase::runWal, this);
}
}
void SqliteDatabase::runWal ()

View File

@@ -57,6 +57,8 @@ public:
int getKBUsedAll ();
private:
ThreadWithCallQueue m_thread;
sqlite3* mConnection;
// VFALCO TODO Why do we need an "aux" connection? Should just use a second SqliteDatabase object.
sqlite3* mAuxConnection;

View File

@@ -20,6 +20,118 @@ class ApplicationImp
, LeakChecked <ApplicationImp>
{
public:
// RAII container for a boost::asio::io_service run by beast threads
class IoServiceThread
{
public:
IoServiceThread (String const& name,
int expectedConcurrency,
int numberOfExtraThreads = 0)
: m_name (name)
, m_service (expectedConcurrency)
, m_work (m_service)
{
m_threads.ensureStorageAllocated (numberOfExtraThreads);
for (int i = 0; i < numberOfExtraThreads; ++i)
m_threads.add (new ServiceThread (m_name, m_service));
}
~IoServiceThread ()
{
m_service.stop ();
// the dtor of m_threads will block until each thread exits.
}
// TEMPORARY HACK for compatibility with old code
void runExtraThreads ()
{
for (int i = 0; i < m_threads.size (); ++i)
m_threads [i]->start ();
}
// Run on the callers thread.
// This will block until stop is issued.
void run ()
{
Thread const* const currentThread (Thread::getCurrentThread());
String previousThreadName;
if (currentThread != nullptr)
{
previousThreadName = currentThread->getThreadName ();
}
else
{
// we're on the main thread
previousThreadName = "main"; // for vanity
}
Thread::setCurrentThreadName (m_name);
m_service.run ();
Thread::setCurrentThreadName (previousThreadName);
}
void stop ()
{
m_service.stop ();
}
boost::asio::io_service& getService ()
{
return m_service;
}
operator boost::asio::io_service& ()
{
return m_service;
}
private:
class ServiceThread : Thread
{
public:
explicit ServiceThread (String const& name, boost::asio::io_service& service)
: Thread (name)
, m_service (service)
{
//startThread ();
}
~ServiceThread ()
{
m_service.stop ();
stopThread (-1); // wait forever
}
void start ()
{
startThread ();
}
void run ()
{
m_service.run ();
}
private:
boost::asio::io_service& m_service;
};
private:
String const m_name;
boost::asio::io_service m_service;
boost::asio::io_service::work m_work;
OwnedArray <ServiceThread> m_threads;
};
//--------------------------------------------------------------------------
static ApplicationImp* createInstance ()
{
return new ApplicationImp;
@@ -40,14 +152,15 @@ public:
//
: SharedSingleton <Application> (SingletonLifetime::neverDestroyed)
#endif
, mIOService ((getConfig ().NODE_SIZE >= 2) ? 2 : 1)
, mIOWork (mIOService)
, m_mainService ("io",
(getConfig ().NODE_SIZE >= 2) ? 2 : 1,
(getConfig ().NODE_SIZE >= 2) ? 1 : 0)
, m_auxService ("auxio", 1, 1)
, mNetOps (new NetworkOPs (&mLedgerMaster))
, m_rpcServerHandler (*mNetOps)
, mTempNodeCache ("NodeCache", 16384, 90)
, mSLECache ("LedgerEntryCache", 4096, 120)
, mSNTPClient (mAuxService)
, mJobQueue (mIOService)
, mSNTPClient (m_auxService)
// VFALCO New stuff
, m_nodeStore (NodeStore::New (
getConfig ().nodeDatabase,
@@ -61,7 +174,7 @@ public:
, mValidations (IValidations::New ())
, mUNL (UniqueNodeList::New ())
, mProofOfWorkFactory (IProofOfWorkFactory::New ())
, mPeers (IPeers::New (mIOService))
, mPeers (IPeers::New (m_mainService))
, m_loadManager (ILoadManager::New ())
// VFALCO End new stuff
// VFALCO TODO replace all NULL with nullptr
@@ -73,7 +186,7 @@ public:
, mRPCDoor (NULL)
, mWSPublicDoor (NULL)
, mWSPrivateDoor (NULL)
, mSweepTimer (mAuxService)
, mSweepTimer (m_auxService)
, mShutdown (false)
{
// VFALCO TODO remove these once the call is thread safe.
@@ -119,7 +232,7 @@ public:
boost::asio::io_service& getIOService ()
{
return mIOService;
return m_mainService;
}
LedgerMaster& getLedgerMaster ()
@@ -258,7 +371,266 @@ public:
{
return mShutdown;
}
void setup ();
//--------------------------------------------------------------------------
static DatabaseCon* openDatabaseCon (const char* fileName,
const char* dbInit[],
int dbCount)
{
return new DatabaseCon (fileName, dbInit, dbCount);
}
void initSqliteDb (int index)
{
switch (index)
{
case 0: mRpcDB = openDatabaseCon ("rpc.db", RpcDBInit, RpcDBCount); break;
case 1: mTxnDB = openDatabaseCon ("transaction.db", TxnDBInit, TxnDBCount); break;
case 2: mLedgerDB = openDatabaseCon ("ledger.db", LedgerDBInit, LedgerDBCount); break;
case 3: mWalletDB = openDatabaseCon ("wallet.db", WalletDBInit, WalletDBCount); break;
};
}
// VFALCO TODO Is it really necessary to init the dbs in parallel?
void initSqliteDbs ()
{
int const count = 4;
ThreadGroup threadGroup (count);
ParallelFor (threadGroup).loop (count, &ApplicationImp::initSqliteDb, this);
}
#ifdef SIGINT
static void sigIntHandler (int)
{
doShutdown = true;
}
#endif
// VFALCO TODO Break this function up into many small initialization segments.
// Or better yet refactor these initializations into RAII classes
// which are members of the Application object.
//
void setup ()
{
// VFALCO NOTE: 0 means use heuristics to determine the thread count.
mJobQueue.setThreadCount (0, getConfig ().RUN_STANDALONE);
mSweepTimer.expires_from_now (boost::posix_time::seconds (10));
mSweepTimer.async_wait (BIND_TYPE (&ApplicationImp::sweep, this));
m_loadManager->startThread ();
#if ! BEAST_WIN32
#ifdef SIGINT
if (!getConfig ().RUN_STANDALONE)
{
struct sigaction sa;
memset (&sa, 0, sizeof (sa));
sa.sa_handler = &ApplicationImp::sigIntHandler;
sigaction (SIGINT, &sa, NULL);
}
#endif
#endif
assert (mTxnDB == NULL);
if (!getConfig ().DEBUG_LOGFILE.empty ())
{
// Let debug messages go to the file but only WARNING or higher to regular output (unless verbose)
Log::setLogFile (getConfig ().DEBUG_LOGFILE);
if (Log::getMinSeverity () > lsDEBUG)
LogPartition::setSeverity (lsDEBUG);
}
if (!getConfig ().RUN_STANDALONE)
mSNTPClient.init (getConfig ().SNTP_SERVERS);
initSqliteDbs ();
leveldb::Options options;
options.create_if_missing = true;
options.block_cache = leveldb::NewLRUCache (getConfig ().getSize (siHashNodeDBCache) * 1024 * 1024);
getApp().getLedgerDB ()->getDB ()->executeSQL (boost::str (boost::format ("PRAGMA cache_size=-%d;") %
(getConfig ().getSize (siLgrDBCache) * 1024)));
getApp().getTxnDB ()->getDB ()->executeSQL (boost::str (boost::format ("PRAGMA cache_size=-%d;") %
(getConfig ().getSize (siTxnDBCache) * 1024)));
mTxnDB->getDB ()->setupCheckpointing (&mJobQueue);
mLedgerDB->getDB ()->setupCheckpointing (&mJobQueue);
if (!getConfig ().RUN_STANDALONE)
updateTables ();
mFeatures->addInitialFeatures ();
if (getConfig ().START_UP == Config::FRESH)
{
WriteLog (lsINFO, Application) << "Starting new Ledger";
startNewLedger ();
}
else if ((getConfig ().START_UP == Config::LOAD) || (getConfig ().START_UP == Config::REPLAY))
{
WriteLog (lsINFO, Application) << "Loading specified Ledger";
if (!loadOldLedger (getConfig ().START_LEDGER, getConfig ().START_UP == Config::REPLAY))
{
getApp().stop ();
exit (-1);
}
}
else if (getConfig ().START_UP == Config::NETWORK)
{
// This should probably become the default once we have a stable network
if (!getConfig ().RUN_STANDALONE)
mNetOps->needNetworkLedger ();
startNewLedger ();
}
else
startNewLedger ();
mOrderBookDB.setup (getApp().getLedgerMaster ().getCurrentLedger ());
//
// Begin validation and ip maintenance.
// - LocalCredentials maintains local information: including identity and network connection persistence information.
//
m_localCredentials.start ();
//
// Set up UNL.
//
if (!getConfig ().RUN_STANDALONE)
getUNL ().nodeBootstrap ();
mValidations->tune (getConfig ().getSize (siValidationsSize), getConfig ().getSize (siValidationsAge));
m_nodeStore->tune (getConfig ().getSize (siNodeCacheSize), getConfig ().getSize (siNodeCacheAge));
mLedgerMaster.tune (getConfig ().getSize (siLedgerSize), getConfig ().getSize (siLedgerAge));
mSLECache.setTargetSize (getConfig ().getSize (siSLECacheSize));
mSLECache.setTargetAge (getConfig ().getSize (siSLECacheAge));
mLedgerMaster.setMinValidations (getConfig ().VALIDATION_QUORUM);
//
// Allow peer connections.
//
if (!getConfig ().RUN_STANDALONE)
{
try
{
mPeerDoor = PeerDoor::New (
getConfig ().PEER_IP,
getConfig ().PEER_PORT,
getConfig ().PEER_SSL_CIPHER_LIST,
m_mainService);
}
catch (const std::exception& e)
{
// Must run as directed or exit.
WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open peer service: %s") % e.what ());
exit (3);
}
}
else
{
WriteLog (lsINFO, Application) << "Peer interface: disabled";
}
//
// Allow RPC connections.
//
if (! getConfig ().getRpcIP().empty () && getConfig ().getRpcPort() != 0)
{
try
{
mRPCDoor = new RPCDoor (m_mainService, m_rpcServerHandler);
}
catch (const std::exception& e)
{
// Must run as directed or exit.
WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open RPC service: %s") % e.what ());
exit (3);
}
}
else
{
WriteLog (lsINFO, Application) << "RPC interface: disabled";
}
//
// Allow private WS connections.
//
if (!getConfig ().WEBSOCKET_IP.empty () && getConfig ().WEBSOCKET_PORT)
{
try
{
mWSPrivateDoor = new WSDoor (getConfig ().WEBSOCKET_IP, getConfig ().WEBSOCKET_PORT, false);
}
catch (const std::exception& e)
{
// Must run as directed or exit.
WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open private websocket service: %s") % e.what ());
exit (3);
}
}
else
{
WriteLog (lsINFO, Application) << "WS private interface: disabled";
}
//
// Allow public WS connections.
//
if (!getConfig ().WEBSOCKET_PUBLIC_IP.empty () && getConfig ().WEBSOCKET_PUBLIC_PORT)
{
try
{
mWSPublicDoor = new WSDoor (getConfig ().WEBSOCKET_PUBLIC_IP, getConfig ().WEBSOCKET_PUBLIC_PORT, true);
}
catch (const std::exception& e)
{
// Must run as directed or exit.
WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open public websocket service: %s") % e.what ());
exit (3);
}
}
else
{
WriteLog (lsINFO, Application) << "WS public interface: disabled";
}
//
// Begin connecting to network.
//
if (!getConfig ().RUN_STANDALONE)
mPeers->start ();
if (getConfig ().RUN_STANDALONE)
{
WriteLog (lsWARNING, Application) << "Running in standalone mode";
mNetOps->setStandAlone ();
}
else
{
// VFALCO NOTE the state timer resets the deadlock detector.
//
mNetOps->setStateTimer ();
}
}
void run ();
void stop ();
void sweep ();
@@ -270,13 +642,12 @@ private:
bool loadOldLedger (const std::string&, bool);
private:
boost::asio::io_service mIOService;
boost::asio::io_service mAuxService;
// The lifetime of the io_service::work object informs the io_service
// of when the work starts and finishes. io_service::run() will not exit
// while the work object exists.
//
boost::asio::io_service::work mIOWork;
IoServiceThread m_mainService;
IoServiceThread m_auxService;
//boost::asio::io_service mIOService;
//boost::asio::io_service mAuxService;
//boost::asio::io_service::work mIOWork;
MasterLockType mMasterLock;
@@ -330,281 +701,24 @@ void ApplicationImp::stop ()
WriteLog (lsINFO, Application) << "Received shutdown request";
StopSustain ();
mShutdown = true;
mIOService.stop ();
m_mainService.stop ();
m_nodeStore = nullptr;
mValidations->flush ();
mAuxService.stop ();
m_auxService.stop ();
mJobQueue.shutdown ();
WriteLog (lsINFO, Application) << "Stopped: " << mIOService.stopped ();
//WriteLog (lsINFO, Application) << "Stopped: " << mIOService.stopped
mShutdown = false;
}
static void InitDB (DatabaseCon** dbCon, const char* fileName, const char* dbInit[], int dbCount)
{
*dbCon = new DatabaseCon (fileName, dbInit, dbCount);
}
#ifdef SIGINT
void sigIntHandler (int)
{
doShutdown = true;
}
#endif
// VFALCO TODO Figure this out it looks like the wrong tool
static void runAux (boost::asio::io_service& svc)
{
setCallingThreadName ("aux");
svc.run ();
}
static void runIO (boost::asio::io_service& io)
{
setCallingThreadName ("io");
io.run ();
}
// VFALCO TODO Break this function up into many small initialization segments.
// Or better yet refactor these initializations into RAII classes
// which are members of the Application object.
//
void ApplicationImp::setup ()
{
// VFALCO NOTE: 0 means use heuristics to determine the thread count.
mJobQueue.setThreadCount (0, getConfig ().RUN_STANDALONE);
mSweepTimer.expires_from_now (boost::posix_time::seconds (10));
mSweepTimer.async_wait (BIND_TYPE (&ApplicationImp::sweep, this));
m_loadManager->startThread ();
#if ! BEAST_WIN32
#ifdef SIGINT
if (!getConfig ().RUN_STANDALONE)
{
struct sigaction sa;
memset (&sa, 0, sizeof (sa));
sa.sa_handler = sigIntHandler;
sigaction (SIGINT, &sa, NULL);
}
#endif
#endif
assert (mTxnDB == NULL);
if (!getConfig ().DEBUG_LOGFILE.empty ())
{
// Let debug messages go to the file but only WARNING or higher to regular output (unless verbose)
Log::setLogFile (getConfig ().DEBUG_LOGFILE);
if (Log::getMinSeverity () > lsDEBUG)
LogPartition::setSeverity (lsDEBUG);
}
boost::thread (BIND_TYPE (runAux, boost::ref (mAuxService))).detach ();
if (!getConfig ().RUN_STANDALONE)
mSNTPClient.init (getConfig ().SNTP_SERVERS);
//
// Construct databases.
//
boost::thread t1 (BIND_TYPE (&InitDB, &mRpcDB, "rpc.db", RpcDBInit, RpcDBCount));
boost::thread t2 (BIND_TYPE (&InitDB, &mTxnDB, "transaction.db", TxnDBInit, TxnDBCount));
boost::thread t3 (BIND_TYPE (&InitDB, &mLedgerDB, "ledger.db", LedgerDBInit, LedgerDBCount));
boost::thread t4 (BIND_TYPE (&InitDB, &mWalletDB, "wallet.db", WalletDBInit, WalletDBCount));
t1.join ();
t2.join ();
t3.join ();
t4.join ();
leveldb::Options options;
options.create_if_missing = true;
options.block_cache = leveldb::NewLRUCache (getConfig ().getSize (siHashNodeDBCache) * 1024 * 1024);
getApp().getLedgerDB ()->getDB ()->executeSQL (boost::str (boost::format ("PRAGMA cache_size=-%d;") %
(getConfig ().getSize (siLgrDBCache) * 1024)));
getApp().getTxnDB ()->getDB ()->executeSQL (boost::str (boost::format ("PRAGMA cache_size=-%d;") %
(getConfig ().getSize (siTxnDBCache) * 1024)));
mTxnDB->getDB ()->setupCheckpointing (&mJobQueue);
mLedgerDB->getDB ()->setupCheckpointing (&mJobQueue);
if (!getConfig ().RUN_STANDALONE)
updateTables ();
mFeatures->addInitialFeatures ();
if (getConfig ().START_UP == Config::FRESH)
{
WriteLog (lsINFO, Application) << "Starting new Ledger";
startNewLedger ();
}
else if ((getConfig ().START_UP == Config::LOAD) || (getConfig ().START_UP == Config::REPLAY))
{
WriteLog (lsINFO, Application) << "Loading specified Ledger";
if (!loadOldLedger (getConfig ().START_LEDGER, getConfig ().START_UP == Config::REPLAY))
{
getApp().stop ();
exit (-1);
}
}
else if (getConfig ().START_UP == Config::NETWORK)
{
// This should probably become the default once we have a stable network
if (!getConfig ().RUN_STANDALONE)
mNetOps->needNetworkLedger ();
startNewLedger ();
}
else
startNewLedger ();
mOrderBookDB.setup (getApp().getLedgerMaster ().getCurrentLedger ());
//
// Begin validation and ip maintenance.
// - LocalCredentials maintains local information: including identity and network connection persistence information.
//
m_localCredentials.start ();
//
// Set up UNL.
//
if (!getConfig ().RUN_STANDALONE)
getUNL ().nodeBootstrap ();
mValidations->tune (getConfig ().getSize (siValidationsSize), getConfig ().getSize (siValidationsAge));
m_nodeStore->tune (getConfig ().getSize (siNodeCacheSize), getConfig ().getSize (siNodeCacheAge));
mLedgerMaster.tune (getConfig ().getSize (siLedgerSize), getConfig ().getSize (siLedgerAge));
mSLECache.setTargetSize (getConfig ().getSize (siSLECacheSize));
mSLECache.setTargetAge (getConfig ().getSize (siSLECacheAge));
mLedgerMaster.setMinValidations (getConfig ().VALIDATION_QUORUM);
//
// Allow peer connections.
//
if (!getConfig ().RUN_STANDALONE)
{
try
{
mPeerDoor = PeerDoor::New (
getConfig ().PEER_IP,
getConfig ().PEER_PORT,
getConfig ().PEER_SSL_CIPHER_LIST,
mIOService);
}
catch (const std::exception& e)
{
// Must run as directed or exit.
WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open peer service: %s") % e.what ());
exit (3);
}
}
else
{
WriteLog (lsINFO, Application) << "Peer interface: disabled";
}
//
// Allow RPC connections.
//
if (! getConfig ().getRpcIP().empty () && getConfig ().getRpcPort() != 0)
{
try
{
mRPCDoor = new RPCDoor (mIOService, m_rpcServerHandler);
}
catch (const std::exception& e)
{
// Must run as directed or exit.
WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open RPC service: %s") % e.what ());
exit (3);
}
}
else
{
WriteLog (lsINFO, Application) << "RPC interface: disabled";
}
//
// Allow private WS connections.
//
if (!getConfig ().WEBSOCKET_IP.empty () && getConfig ().WEBSOCKET_PORT)
{
try
{
mWSPrivateDoor = WSDoor::createWSDoor (getConfig ().WEBSOCKET_IP, getConfig ().WEBSOCKET_PORT, false);
}
catch (const std::exception& e)
{
// Must run as directed or exit.
WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open private websocket service: %s") % e.what ());
exit (3);
}
}
else
{
WriteLog (lsINFO, Application) << "WS private interface: disabled";
}
//
// Allow public WS connections.
//
if (!getConfig ().WEBSOCKET_PUBLIC_IP.empty () && getConfig ().WEBSOCKET_PUBLIC_PORT)
{
try
{
mWSPublicDoor = WSDoor::createWSDoor (getConfig ().WEBSOCKET_PUBLIC_IP, getConfig ().WEBSOCKET_PUBLIC_PORT, true);
}
catch (const std::exception& e)
{
// Must run as directed or exit.
WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open public websocket service: %s") % e.what ());
exit (3);
}
}
else
{
WriteLog (lsINFO, Application) << "WS public interface: disabled";
}
//
// Begin connecting to network.
//
if (!getConfig ().RUN_STANDALONE)
mPeers->start ();
if (getConfig ().RUN_STANDALONE)
{
WriteLog (lsWARNING, Application) << "Running in standalone mode";
mNetOps->setStandAlone ();
}
else
{
// VFALCO NOTE the state timer resets the deadlock detector.
//
mNetOps->setStateTimer ();
}
}
void ApplicationImp::run ()
{
if (getConfig ().NODE_SIZE >= 2)
{
boost::thread (BIND_TYPE (runIO, boost::ref (mIOService))).detach ();
}
// VFALCO TODO The unit tests crash if we try to
// run these threads in the IoService constructor
// so this hack makes them start later.
//
m_mainService.runExtraThreads ();
if (!getConfig ().RUN_STANDALONE)
{
@@ -614,7 +728,7 @@ void ApplicationImp::run ()
getApp().getLoadManager ().activateDeadlockDetector ();
}
mIOService.run (); // This blocks
m_mainService.run (); // This blocks until the io_service is stopped.
if (mWSPublicDoor)
mWSPublicDoor->stop ();

View File

@@ -145,7 +145,20 @@ public:
{
if (m_shouldLog)
{
#if BEAST_MSVC
if (beast_isRunningUnderDebugger ())
{
Logger::outputDebugString (message);
}
else
{
std::cout << message.toStdString () << std::endl;
}
#else
std::cout << message.toStdString () << std::endl;
#endif
}
}

View File

@@ -21,9 +21,36 @@ SETUP_LOG (WSDoor)
//
// VFALCO NOTE NetworkOPs isn't used here...
//
void WSDoor::startListening ()
WSDoor::WSDoor (std::string const& strIp, int iPort, bool bPublic)
: Thread ("websocket")
, mPublic (bPublic)
, mIp (strIp)
, mPort (iPort)
{
setCallingThreadName ("websocket");
startThread ();
}
WSDoor::~WSDoor ()
{
{
CriticalSection::ScopedLockType lock (m_endpointLock);
if (m_endpoint != nullptr)
m_endpoint->stop ();
}
signalThreadShouldExit ();
waitForThreadToExit ();
}
void WSDoor::run ()
{
WriteLog (lsINFO, WSDoor) << boost::str (boost::format ("Websocket: %s: Listening: %s %d ")
% (mPublic ? "Public" : "Private")
% mIp
% mPort);
// Generate a single SSL context for use by all connections.
boost::shared_ptr<boost::asio::ssl::context> mCtx;
mCtx = boost::make_shared<boost::asio::ssl::context> (boost::asio::ssl::context::sslv23);
@@ -35,11 +62,13 @@ void WSDoor::startListening ()
SSL_CTX_set_tmp_dh_callback (mCtx->native_handle (), handleTmpDh);
// Construct a single handler for all requests.
websocketpp::server_autotls::handler::ptr handler (new WSServerHandler<websocketpp::server_autotls> (mCtx, mPublic));
// Construct a websocket server.
mSEndpoint = new websocketpp::server_autotls (handler);
{
CriticalSection::ScopedLockType lock (m_endpointLock);
m_endpoint = new websocketpp::server_autotls (handler);
}
// mEndpoint->alog().unset_level(websocketpp::log::alevel::ALL);
// mEndpoint->elog().unset_level(websocketpp::log::elevel::ALL);
@@ -47,7 +76,7 @@ void WSDoor::startListening ()
// Call the main-event-loop of the websocket server.
try
{
mSEndpoint->listen (
m_endpoint->listen (
boost::asio::ip::tcp::endpoint (
boost::asio::ip::address ().from_string (mIp), mPort));
}
@@ -60,7 +89,7 @@ void WSDoor::startListening ()
// https://github.com/zaphoyd/websocketpp/issues/98
try
{
mSEndpoint->get_io_service ().run ();
m_endpoint->get_io_service ().run ();
break;
}
catch (websocketpp::exception& e)
@@ -70,32 +99,18 @@ void WSDoor::startListening ()
}
}
delete mSEndpoint;
}
WSDoor* WSDoor::createWSDoor (const std::string& strIp, const int iPort, bool bPublic)
{
WSDoor* wdpResult = new WSDoor (strIp, iPort, bPublic);
WriteLog (lsINFO, WSDoor) <<
boost::str (boost::format ("Websocket: %s: Listening: %s %d ")
% (bPublic ? "Public" : "Private")
% strIp
% iPort);
wdpResult->mThread = new boost::thread (BIND_TYPE (&WSDoor::startListening, wdpResult));
return wdpResult;
delete m_endpoint;
}
void WSDoor::stop ()
{
if (mThread)
{
if (mSEndpoint)
mSEndpoint->stop ();
CriticalSection::ScopedLockType lock (m_endpointLock);
mThread->join ();
if (m_endpoint != nullptr)
m_endpoint->stop ();
}
signalThreadShouldExit ();
waitForThreadToExit ();
}

View File

@@ -7,28 +7,24 @@
#ifndef RIPPLE_WSDOOR_RIPPLEHEADER
#define RIPPLE_WSDOOR_RIPPLEHEADER
class WSDoor : LeakChecked <WSDoor>
class WSDoor : protected Thread, LeakChecked <WSDoor>
{
private:
websocketpp::server_autotls* mSEndpoint;
public:
WSDoor (std::string const& strIp, int iPort, bool bPublic);
boost::thread* mThread;
~WSDoor ();
void stop ();
private:
void run ();
private:
ScopedPointer <websocketpp::server_autotls> m_endpoint;
CriticalSection m_endpointLock;
bool mPublic;
std::string mIp;
int mPort;
void startListening ();
public:
WSDoor (const std::string& strIp, int iPort, bool bPublic) : mSEndpoint (0), mThread (0), mPublic (bPublic), mIp (strIp), mPort (iPort)
{
;
}
void stop ();
static WSDoor* createWSDoor (const std::string& strIp, const int iPort, bool bPublic);
};
#endif

View File

@@ -60,8 +60,6 @@
#include <boost/unordered_set.hpp>
#include <boost/weak_ptr.hpp>
//#include "../ripple_sqlite/ripple_sqlite.h" // for SqliteDatabase.cpp
#include "../ripple_core/ripple_core.h"
#include "beast/modules/beast_db/beast_db.h"

View File

@@ -6,11 +6,9 @@
SETUP_LOG (JobQueue)
JobQueue::JobQueue (boost::asio::io_service& svc)
: mLastJob (0)
, mThreadCount (0)
, mShuttingDown (false)
, mIOService (svc)
JobQueue::JobQueue ()
: m_workers (*this, 0)
, mLastJob (0)
{
mJobLoads [ jtPUBOLDLEDGER ].setTargetLatency (10000, 15000);
mJobLoads [ jtVALIDATION_ut ].setTargetLatency (2000, 5000);
@@ -30,9 +28,13 @@ JobQueue::JobQueue (boost::asio::io_service& svc)
mJobLoads [ jtACCEPTLEDGER ].setTargetLatency (1000, 2500);
}
JobQueue::~JobQueue ()
{
}
void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& jobFunc)
{
addLimitJob(type, name, 0, jobFunc);
addLimitJob (type, name, 0, jobFunc);
}
void JobQueue::addLimitJob (JobType type, const std::string& name, int limit, const FUNCTION_TYPE<void (Job&)>& jobFunc)
@@ -41,21 +43,26 @@ void JobQueue::addLimitJob (JobType type, const std::string& name, int limit, co
boost::mutex::scoped_lock sl (mJobLock);
if (type != jtCLIENT) // FIXME: Workaround incorrect client shutdown ordering
assert (mThreadCount != 0); // do not add jobs to a queue with no threads
// FIXME: Workaround incorrect client shutdown ordering
// do not add jobs to a queue with no threads
bassert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0);
std::pair< std::set <Job>::iterator, bool > it =
mJobSet.insert (Job (type, name, limit, ++mLastJob, mJobLoads[type], jobFunc));
it.first->peekEvent().start(); // start timing how long it stays in the queue
++mJobCounts[type].first;
mJobCond.notify_one ();
m_workers.addTask ();
}
int JobQueue::getJobCount (JobType t)
{
boost::mutex::scoped_lock sl (mJobLock);
std::map< JobType, std::pair<int, int> >::iterator c = mJobCounts.find (t);
JobCounts::iterator c = mJobCounts.find (t);
return (c == mJobCounts.end ()) ? 0 : c->second.first;
}
@@ -63,7 +70,8 @@ int JobQueue::getJobCountTotal (JobType t)
{
boost::mutex::scoped_lock sl (mJobLock);
std::map< JobType, std::pair<int, int> >::iterator c = mJobCounts.find (t);
JobCounts::iterator c = mJobCounts.find (t);
return (c == mJobCounts.end ()) ? 0 : (c->second.first + c->second.second);
}
@@ -74,11 +82,13 @@ int JobQueue::getJobCountGE (JobType t)
boost::mutex::scoped_lock sl (mJobLock);
typedef std::map< JobType, std::pair<int, int> >::value_type jt_int_pair;
BOOST_FOREACH (const jt_int_pair & it, mJobCounts)
typedef JobCounts::value_type jt_int_pair;
if (it.first >= t)
ret += it.second.first;
BOOST_FOREACH (jt_int_pair const& it, mJobCounts)
{
if (it.first >= t)
ret += it.second.first;
}
return ret;
}
@@ -89,11 +99,15 @@ std::vector< std::pair<JobType, std::pair<int, int> > > JobQueue::getJobCounts (
std::vector< std::pair<JobType, std::pair<int, int> > > ret;
boost::mutex::scoped_lock sl (mJobLock);
ret.reserve (mJobCounts.size ());
typedef std::map< JobType, std::pair<int, int> >::value_type jt_int_pair;
typedef JobCounts::value_type jt_int_pair;
BOOST_FOREACH (const jt_int_pair & it, mJobCounts)
ret.push_back (it);
{
ret.push_back (it);
}
return ret;
}
@@ -101,9 +115,10 @@ std::vector< std::pair<JobType, std::pair<int, int> > > JobQueue::getJobCounts (
Json::Value JobQueue::getJson (int)
{
Json::Value ret (Json::objectValue);
boost::mutex::scoped_lock sl (mJobLock);
ret["threads"] = mThreadCount;
ret["threads"] = m_workers.getNumberOfThreads ();
Json::Value priorities = Json::arrayValue;
@@ -116,7 +131,7 @@ Json::Value JobQueue::getJson (int)
int jobCount, threadCount;
bool isOver;
mJobLoads[i].getCountAndLatency (count, latencyAvg, latencyPeak, isOver);
std::map< JobType, std::pair<int, int> >::iterator it = mJobCounts.find (static_cast<JobType> (i));
JobCounts::iterator it = mJobCounts.find (static_cast<JobType> (i));
if (it == mJobCounts.end ())
{
@@ -165,6 +180,7 @@ Json::Value JobQueue::getJson (int)
bool JobQueue::isOverloaded ()
{
int count = 0;
boost::mutex::scoped_lock sl (mJobLock);
for (int i = 0; i < NUM_JOB_TYPES; ++i)
@@ -174,16 +190,13 @@ bool JobQueue::isOverloaded ()
return count > 0;
}
// shut down the job queue without completing pending jobs
//
void JobQueue::shutdown ()
{
// shut down the job queue without completing pending jobs
WriteLog (lsINFO, JobQueue) << "Job queue shutting down";
boost::mutex::scoped_lock sl (mJobLock);
mShuttingDown = true;
mJobCond.notify_all ();
while (mThreadCount != 0)
mJobCond.wait (sl);
m_workers.pauseAllThreadsAndWait ();
}
// set the number of thread serving the job queue to precisely this number
@@ -195,7 +208,7 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode)
}
else if (c == 0)
{
c = boost::thread::hardware_concurrency ();
c = SystemStats::getNumCpus ();
// VFALCO NOTE According to boost, hardware_concurrency cannot return
// negative numbers/
@@ -210,113 +223,83 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode)
WriteLog (lsINFO, JobQueue) << "Auto-tuning to " << c << " validation/transaction/proposal threads";
}
// VFALCO TODO Split the function up. The lower part actually does the "do",
// The part above this comment figures out the value for numThreads
//
boost::mutex::scoped_lock sl (mJobLock);
while (mJobCounts[jtDEATH].first != 0)
{
mJobCond.wait (sl);
}
while (mThreadCount < c)
{
++mThreadCount;
boost::thread (BIND_TYPE (&JobQueue::threadEntry, this)).detach ();
}
while (mThreadCount > c)
{
if (mJobCounts[jtDEATH].first != 0)
{
mJobCond.wait (sl);
}
else
{
mJobSet.insert (Job (jtDEATH, 0));
++ (mJobCounts[jtDEATH].first);
}
}
mJobCond.notify_one (); // in case we sucked up someone else's signal
m_workers.setNumberOfThreads (c);
}
bool JobQueue::getJob(Job& job)
{
if (mJobSet.empty() || mShuttingDown)
return false;
bool gotJob = false;
std::set<Job>::iterator it = mJobSet.begin ();
while (1)
if (! mJobSet.empty ())
{
// Are we out of jobs?
if (it == mJobSet.end())
return false;
std::set<Job>::iterator it = mJobSet.begin ();
// Does this job have no limit?
if (it->getLimit() == 0)
break;
for (;;)
{
// VFALCO NOTE how can we be out of jobs if we just checked mJobSet.empty ()?
//
// Are we out of jobs?
if (it == mJobSet.end())
return false; // VFALCO TODO get rid of this return from the middle
// Is this job category below the limit?
if (mJobCounts[it->getType()].second < it->getLimit())
break;
// Does this job have no limit?
if (it->getLimit() == 0)
break;
// Try the next job, if any
++it;
// Is this job category below the limit?
if (mJobCounts[it->getType()].second < it->getLimit())
break;
// Try the next job, if any
++it;
}
job = *it;
mJobSet.erase (it);
gotJob = true;
}
job = *it;
mJobSet.erase (it);
return true;
return gotJob;
}
// do jobs until asked to stop
void JobQueue::threadEntry ()
void JobQueue::processTask ()
{
boost::mutex::scoped_lock sl (mJobLock);
while (1)
{
JobType type;
// This lock shouldn't be needed
boost::mutex::scoped_lock lock (mJobLock);
setCallingThreadName ("waiting");
JobType type (jtINVALID);
{
Job job;
while (!getJob(job))
bool const haveJob = getJob (job);
if (haveJob)
{
if (mShuttingDown)
{
--mThreadCount;
mJobCond.notify_all();
return;
}
mJobCond.wait (sl);
type = job.getType ();
// VFALCO TODO Replace with Atomic <>
--(mJobCounts[type].first);
++(mJobCounts[type].second);
lock.unlock ();
Thread::setCurrentThreadName (Job::toString (type));
WriteLog (lsTRACE, JobQueue) << "Doing " << Job::toString (type) << " job";
job.doJob ();
}
type = job.getType ();
-- (mJobCounts[type].first);
// must destroy job, here, without holding lock
}
if (type == jtDEATH)
{
--mThreadCount;
mJobCond.notify_all();
return;
}
++ (mJobCounts[type].second);
sl.unlock ();
setCallingThreadName (Job::toString (type));
WriteLog (lsTRACE, JobQueue) << "Doing " << Job::toString (type) << " job";
job.doJob ();
} // must destroy job without holding lock
sl.lock ();
-- (mJobCounts[type].second);
if (type != jtINVALID)
{
lock.lock ();
-- (mJobCounts[type].second);
}
}
}
// vim:ts=4

View File

@@ -4,13 +4,17 @@
*/
//==============================================================================
#ifndef RIPPLE_JOBQUEUE_H
#define RIPPLE_JOBQUEUE_H
#ifndef RIPPLE_JOBQUEUE_H_INCLUDED
#define RIPPLE_JOBQUEUE_H_INCLUDED
class JobQueue
class JobQueue : private Workers::Callback
{
public:
explicit JobQueue (boost::asio::io_service&);
typedef std::map<JobType, std::pair<int, int > > JobCounts;
JobQueue ();
~JobQueue ();
// VFALCO TODO make convenience functions that allow the caller to not
// have to call bind.
@@ -46,22 +50,17 @@ public:
Json::Value getJson (int c = 0);
private:
void threadEntry ();
boost::mutex mJobLock;
boost::condition_variable mJobCond;
uint64 mLastJob;
std::set <Job> mJobSet;
LoadMonitor mJobLoads [NUM_JOB_TYPES];
int mThreadCount;
bool mShuttingDown;
boost::asio::io_service& mIOService;
std::map<JobType, std::pair<int, int > > mJobCounts;
bool getJob (Job& job);
void processTask ();
private:
Workers m_workers;
boost::mutex mJobLock; // VFALCO TODO Replace with CriticalSection
uint64 mLastJob;
std::set <Job> mJobSet;
LoadMonitor mJobLoads [NUM_JOB_TYPES];
JobCounts mJobCounts;
};
#endif

BIN
rippled.1

Binary file not shown.