Tidy up JobQueue, add ripple_core module

This commit is contained in:
Vinnie Falco
2013-06-22 16:35:40 -07:00
parent fcb4c35fce
commit 64c8d406df
8 changed files with 107 additions and 54 deletions

View File

@@ -169,6 +169,7 @@
</Link> </Link>
</ItemDefinitionGroup> </ItemDefinitionGroup>
<ItemGroup> <ItemGroup>
<ClCompile Include="..\..\modules\ripple_core\ripple_core.cpp" />
<ClCompile Include="..\..\modules\ripple_leveldb\ripple_leveldb.cpp" /> <ClCompile Include="..\..\modules\ripple_leveldb\ripple_leveldb.cpp" />
<ClCompile Include="..\..\modules\ripple_sqlite\ripple_sqlite.c"> <ClCompile Include="..\..\modules\ripple_sqlite\ripple_sqlite.c">
<WarningLevel Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Level4</WarningLevel> <WarningLevel Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Level4</WarningLevel>
@@ -1517,6 +1518,7 @@
<ClInclude Include="..\..\modules\ripple_basics\utility\ripple_Time.h" /> <ClInclude Include="..\..\modules\ripple_basics\utility\ripple_Time.h" />
<ClInclude Include="..\..\modules\ripple_basics\utility\ripple_UptimeTimer.h" /> <ClInclude Include="..\..\modules\ripple_basics\utility\ripple_UptimeTimer.h" />
<ClInclude Include="..\..\modules\ripple_client\ripple_client.h" /> <ClInclude Include="..\..\modules\ripple_client\ripple_client.h" />
<ClInclude Include="..\..\modules\ripple_core\ripple_core.h" />
<ClInclude Include="..\..\modules\ripple_data\crypto\ripple_Base58.h" /> <ClInclude Include="..\..\modules\ripple_data\crypto\ripple_Base58.h" />
<ClInclude Include="..\..\modules\ripple_data\crypto\ripple_Base58Data.h" /> <ClInclude Include="..\..\modules\ripple_data\crypto\ripple_Base58Data.h" />
<ClInclude Include="..\..\modules\ripple_data\crypto\ripple_CBigNum.h" /> <ClInclude Include="..\..\modules\ripple_data\crypto\ripple_CBigNum.h" />

View File

@@ -139,6 +139,12 @@
<Filter Include="1. Modules\ripple_leveldb"> <Filter Include="1. Modules\ripple_leveldb">
<UniqueIdentifier>{82d79c26-4932-4a48-b134-09969f45d75a}</UniqueIdentifier> <UniqueIdentifier>{82d79c26-4932-4a48-b134-09969f45d75a}</UniqueIdentifier>
</Filter> </Filter>
<Filter Include="1. Modules\ripple_core">
<UniqueIdentifier>{ce56fa36-0012-44ab-a6dd-da0391f32ba3}</UniqueIdentifier>
</Filter>
<Filter Include="1. Modules\ripple_core\functional">
<UniqueIdentifier>{f7a586fa-b21a-4c7b-b87e-5ac62f2758e4}</UniqueIdentifier>
</Filter>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClCompile Include="..\..\src\cpp\protobuf_core.cpp"> <ClCompile Include="..\..\src\cpp\protobuf_core.cpp">
@@ -888,6 +894,9 @@
<ClCompile Include="..\..\modules\ripple_leveldb\ripple_leveldb.cpp"> <ClCompile Include="..\..\modules\ripple_leveldb\ripple_leveldb.cpp">
<Filter>1. Modules\ripple_leveldb</Filter> <Filter>1. Modules\ripple_leveldb</Filter>
</ClCompile> </ClCompile>
<ClCompile Include="..\..\modules\ripple_core\ripple_core.cpp">
<Filter>1. Modules\ripple_core</Filter>
</ClCompile>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClInclude Include="..\..\src\cpp\protobuf\src\google\protobuf\service.h"> <ClInclude Include="..\..\src\cpp\protobuf\src\google\protobuf\service.h">
@@ -1704,6 +1713,9 @@
<ClInclude Include="..\..\modules\ripple_leveldb\ripple_leveldb.h"> <ClInclude Include="..\..\modules\ripple_leveldb\ripple_leveldb.h">
<Filter>1. Modules\ripple_leveldb</Filter> <Filter>1. Modules\ripple_leveldb</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="..\..\modules\ripple_core\ripple_core.h">
<Filter>1. Modules\ripple_core</Filter>
</ClInclude>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<None Include="..\..\src\cpp\protobuf\src\google\protobuf\descriptor.proto"> <None Include="..\..\src\cpp\protobuf\src\google\protobuf\descriptor.proto">

View File

@@ -104,6 +104,7 @@ COMPILED_FILES = [
'Subtrees/beast/modules/beast_core/beast_core.cpp', 'Subtrees/beast/modules/beast_core/beast_core.cpp',
'Subtrees/beast/modules/beast_basics/beast_basics.cpp', 'Subtrees/beast/modules/beast_basics/beast_basics.cpp',
'modules/ripple_basics/ripple_basics.cpp', 'modules/ripple_basics/ripple_basics.cpp',
'modules/ripple_core/ripple_core.cpp',
'modules/ripple_data/ripple_data.cpp', 'modules/ripple_data/ripple_data.cpp',
'modules/ripple_json/ripple_json.cpp', 'modules/ripple_json/ripple_json.cpp',
'modules/ripple_leveldb/ripple_leveldb.cpp', 'modules/ripple_leveldb/ripple_leveldb.cpp',

View File

@@ -0,0 +1,16 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
/** Add this to get the @ref ripple_core module.
@file ripple_core.cpp
@ingroup ripple_core
*/
#include "ripple_core.h"
//#include "src/cpp/ripple/ripple_Job.h"
//#include "src/cpp/ripple/ripple_JobQueue.h"

View File

@@ -0,0 +1,24 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
/** Include this to get the @ref ripple_core module.
@file ripple_core.h
@ingroup ripple_core
*/
/** Core classes.
These objects form the execution framework in which the Ripple
protocol is implemented.
@defgroup ripple_core
*/
#ifndef RIPPLE_CORE_RIPPLEHEADER
#define RIPPLE_CORE_RIPPLEHEADER
#endif

View File

@@ -87,8 +87,17 @@ public:
std::vector<std::string> IPS; // Peer IPs from rippled.cfg. std::vector<std::string> IPS; // Peer IPs from rippled.cfg.
std::vector<std::string> SNTP_SERVERS; // SNTP servers from rippled.cfg. std::vector<std::string> SNTP_SERVERS; // SNTP servers from rippled.cfg.
enum StartUpType { FRESH, NORMAL, LOAD, NETWORK }; enum StartUpType
{
FRESH,
NORMAL,
LOAD,
NETWORK
};
StartUpType START_UP; StartUpType START_UP;
std::string START_LEDGER; std::string START_LEDGER;
// Database // Database
@@ -101,6 +110,16 @@ public:
int LEDGER_PROPOSAL_DELAY_SECONDS; int LEDGER_PROPOSAL_DELAY_SECONDS;
int LEDGER_AVALANCHE_SECONDS; int LEDGER_AVALANCHE_SECONDS;
bool LEDGER_CREATOR; // Should be false unless we are starting a new ledger. bool LEDGER_CREATOR; // Should be false unless we are starting a new ledger.
/** Operate in stand-alone mode.
In stand alone mode:
- Peer connections are not attempted or accepted
- The ledger is not advanced automatically.
- If no ledger is loaded, the default ledger with the root
account is created.
*/
bool RUN_STANDALONE; bool RUN_STANDALONE;
// Note: The following parameters do not relate to the UNL or trust at all // Note: The following parameters do not relate to the UNL or trust at all

View File

@@ -7,24 +7,27 @@
SETUP_LOG (JobQueue) SETUP_LOG (JobQueue)
JobQueue::JobQueue (boost::asio::io_service& svc) JobQueue::JobQueue (boost::asio::io_service& svc)
: mLastJob (0), mThreadCount (0), mShuttingDown (false), mIOThreadCount (0), mMaxIOThreadCount (1), mIOService (svc) : mLastJob (0)
, mThreadCount (0)
, mShuttingDown (false)
, mIOService (svc)
{ {
mJobLoads[jtPUBOLDLEDGER].setTargetLatency (10000, 15000); mJobLoads [ jtPUBOLDLEDGER ].setTargetLatency (10000, 15000);
mJobLoads[jtVALIDATION_ut].setTargetLatency (2000, 5000); mJobLoads [ jtVALIDATION_ut ].setTargetLatency (2000, 5000);
mJobLoads[jtPROOFWORK].setTargetLatency (2000, 5000); mJobLoads [ jtPROOFWORK ].setTargetLatency (2000, 5000);
mJobLoads[jtTRANSACTION].setTargetLatency (250, 1000); mJobLoads [ jtTRANSACTION ].setTargetLatency (250, 1000);
mJobLoads[jtPROPOSAL_ut].setTargetLatency (500, 1250); mJobLoads [ jtPROPOSAL_ut ].setTargetLatency (500, 1250);
mJobLoads[jtPUBLEDGER].setTargetLatency (3000, 4500); mJobLoads [ jtPUBLEDGER ].setTargetLatency (3000, 4500);
mJobLoads[jtWAL].setTargetLatency (1000, 2500); mJobLoads [ jtWAL ].setTargetLatency (1000, 2500);
mJobLoads[jtVALIDATION_t].setTargetLatency (500, 1500); mJobLoads [ jtVALIDATION_t ].setTargetLatency (500, 1500);
mJobLoads[jtWRITE].setTargetLatency (750, 1500); mJobLoads [ jtWRITE ].setTargetLatency (750, 1500);
mJobLoads[jtTRANSACTION_l].setTargetLatency (100, 500); mJobLoads [ jtTRANSACTION_l ].setTargetLatency (100, 500);
mJobLoads[jtPROPOSAL_t].setTargetLatency (100, 500); mJobLoads [ jtPROPOSAL_t ].setTargetLatency (100, 500);
mJobLoads[jtCLIENT].setTargetLatency (2000, 5000); mJobLoads [ jtCLIENT ].setTargetLatency (2000, 5000);
mJobLoads[jtPEER].setTargetLatency (200, 1250); mJobLoads [ jtPEER ].setTargetLatency (200, 1250);
mJobLoads[jtDISK].setTargetLatency (500, 1000); mJobLoads [ jtDISK ].setTargetLatency (500, 1000);
mJobLoads[jtACCEPTLEDGER].setTargetLatency (1000, 2500); mJobLoads [ jtACCEPTLEDGER ].setTargetLatency (1000, 2500);
} }
void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& jobFunc) void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& jobFunc)
@@ -177,13 +180,18 @@ void JobQueue::shutdown ()
void JobQueue::setThreadCount (int c) void JobQueue::setThreadCount (int c)
{ {
if (theConfig.RUN_STANDALONE) if (theConfig.RUN_STANDALONE)
{
c = 1; c = 1;
}
else if (c == 0) else if (c == 0)
{ {
c = boost::thread::hardware_concurrency (); c = boost::thread::hardware_concurrency ();
// VFALCO NOTE According to boost, hardware_concurrency cannot return
// negative numbers/
//
if (c < 0) if (c < 0)
c = 2; c = 2; // VFALCO NOTE Why 2?
if (c > 4) // I/O will bottleneck if (c > 4) // I/O will bottleneck
c = 4; c = 4;
@@ -192,12 +200,15 @@ void JobQueue::setThreadCount (int c)
WriteLog (lsINFO, JobQueue) << "Auto-tuning to " << c << " validation/transaction/proposal threads"; WriteLog (lsINFO, JobQueue) << "Auto-tuning to " << c << " validation/transaction/proposal threads";
} }
// VFALCO TODO Split the function up. The lower part actually does the "do",
// The part above this comment figures out the value for numThreads
//
boost::mutex::scoped_lock sl (mJobLock); boost::mutex::scoped_lock sl (mJobLock);
mMaxIOThreadCount = 1 + (c / 3);
while (mJobCounts[jtDEATH].first != 0) while (mJobCounts[jtDEATH].first != 0)
{
mJobCond.wait (sl); mJobCond.wait (sl);
}
while (mThreadCount < c) while (mThreadCount < c)
{ {
@@ -208,7 +219,9 @@ void JobQueue::setThreadCount (int c)
while (mThreadCount > c) while (mThreadCount > c)
{ {
if (mJobCounts[jtDEATH].first != 0) if (mJobCounts[jtDEATH].first != 0)
{
mJobCond.wait (sl); mJobCond.wait (sl);
}
else else
{ {
mJobSet.insert (Job (jtDEATH, 0)); mJobSet.insert (Job (jtDEATH, 0));
@@ -219,27 +232,6 @@ void JobQueue::setThreadCount (int c)
mJobCond.notify_one (); // in case we sucked up someone else's signal mJobCond.notify_one (); // in case we sucked up someone else's signal
} }
void JobQueue::IOThread (boost::mutex::scoped_lock& sl)
{
// call with a lock
++mIOThreadCount;
sl.unlock ();
setCallingThreadName ("IO+");
try
{
mIOService.poll ();
}
catch (...)
{
WriteLog (lsWARNING, JobQueue) << "Exception in IOThread";
}
setCallingThreadName ("waiting");
sl.lock ();
--mIOThreadCount;
}
// do jobs until asked to stop // do jobs until asked to stop
void JobQueue::threadEntry () void JobQueue::threadEntry ()
{ {
@@ -249,19 +241,9 @@ void JobQueue::threadEntry ()
{ {
setCallingThreadName ("waiting"); setCallingThreadName ("waiting");
// bool didIO = false;
while (mJobSet.empty () && !mShuttingDown) while (mJobSet.empty () && !mShuttingDown)
{ {
// if ((mIOThreadCount < mMaxIOThreadCount) && !didIO && !theApp->isShutdown())
// {
// IOThread(sl);
// didIO = true;
// }
// else
// {
mJobCond.wait (sl); mJobCond.wait (sl);
// didIO = false;
// }
} }
if (mJobSet.empty ()) if (mJobSet.empty ())

View File

@@ -43,7 +43,6 @@ public:
private: private:
void threadEntry (); void threadEntry ();
void IOThread (boost::mutex::scoped_lock&);
boost::mutex mJobLock; boost::mutex mJobLock;
boost::condition_variable mJobCond; boost::condition_variable mJobCond;
@@ -54,8 +53,6 @@ private:
int mThreadCount; int mThreadCount;
bool mShuttingDown; bool mShuttingDown;
int mIOThreadCount;
int mMaxIOThreadCount;
boost::asio::io_service& mIOService; boost::asio::io_service& mIOService;
std::map<JobType, std::pair<int, int > > mJobCounts; std::map<JobType, std::pair<int, int > > mJobCounts;