Split Job and JobQueue and move functions to the .cpp

This commit is contained in:
Vinnie Falco
2013-06-03 21:08:47 -07:00
parent 3fafde063f
commit 8b6c2c0c7a
26 changed files with 416 additions and 380 deletions

View File

@@ -9,8 +9,6 @@
#include <boost/algorithm/string/classification.hpp>
#include <boost/test/unit_test.hpp>
#include "Config.h"
SETUP_LOG (STAmount)
uint64 STAmount::uRateOne = STAmount::getRate(STAmount(1), STAmount(1));

View File

@@ -5,7 +5,6 @@
#include "leveldb/filter_policy.h"
#include "AcceptedLedger.h"
#include "Config.h"
#include "PeerDoor.h"
#include "RPCDoor.h"

View File

@@ -23,7 +23,6 @@
#include "Application.h"
#include "RPC.h"
#include "RPCErr.h"
#include "Config.h"
#include "CallRPC.h"

View File

@@ -8,7 +8,6 @@
#include <boost/algorithm/string.hpp>
#include <algorithm>
#include "Config.h"
#include "Peer.h"
#include "PeerDoor.h"
#include "Application.h"

View File

@@ -10,8 +10,6 @@
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/system/error_code.hpp>
#include "Config.h"
#define CLIENT_MAX_HEADER (32*1024)
SETUP_LOG (HttpsClient)

View File

@@ -1,6 +1,5 @@
#include "Interpreter.h"
#include "Operation.h"
#include "Config.h"
/*
We also need to charge for each op

View File

@@ -1,125 +0,0 @@
#ifndef JOB_QUEUE__H
#define JOB_QUEUE__H
// Note that this queue should only be used for CPU-bound jobs
// It is primarily intended for signature checking
enum JobType
{
// must be in priority order, low to high
jtINVALID = -1,
jtPACK = 1, // Make a fetch pack for a peer
jtPUBOLDLEDGER = 2, // An old ledger has been accepted
jtVALIDATION_ut = 3, // A validation from an untrusted source
jtPROOFWORK = 4, // A proof of work demand from another server
jtPROPOSAL_ut = 5, // A proposal from an untrusted source
jtLEDGER_DATA = 6, // Received data for a ledger we're acquiring
jtUPDATE_PF = 7, // Update pathfinding requests
jtCLIENT = 8, // A websocket command from the client
jtTRANSACTION = 9, // A transaction received from the network
jtPUBLEDGER = 10, // Publish a fully-accepted ledger
jtWAL = 11, // Write-ahead logging
jtVALIDATION_t = 12, // A validation from a trusted source
jtWRITE = 13, // Write out hashed objects
jtTRANSACTION_l = 14, // A local transaction
jtPROPOSAL_t = 15, // A proposal from a trusted source
jtADMIN = 16, // An administrative operation
jtDEATH = 17, // job of death, used internally
// special types not dispatched by the job pool
jtPEER = 24,
jtDISK = 25,
jtACCEPTLEDGER = 26,
jtTXN_PROC = 27,
jtOB_SETUP = 28,
jtPATH_FIND = 29,
jtHO_READ = 30,
jtHO_WRITE = 31,
}; // CAUTION: If you add new types, add them to JobType.cpp too
// VFALCO: TODO, move this into the enum so it calculates itself?
#define NUM_JOB_TYPES 48 // why 48 and not 32?
class Job
{
public:
Job() : mType(jtINVALID), mJobIndex(0) { ; }
Job(JobType type, uint64 index) : mType(type), mJobIndex(index)
{ ; }
Job(JobType type, const std::string& name, uint64 index, LoadMonitor& lm, const FUNCTION_TYPE<void(Job&)>& job)
: mType(type), mJobIndex(index), mJob(job), mName(name)
{
// VFALCO: NOTE, what the heck does this mean?
mLoadMonitor = boost::make_shared<LoadEvent>(boost::ref(lm), name, false);
}
JobType getType() const { return mType; }
void doJob(void) { mLoadMonitor->start(); mJob(*this); mLoadMonitor->reName(mName); }
void rename(const std::string& n) { mName = n; }
bool operator<(const Job& j) const;
bool operator>(const Job& j) const;
bool operator<=(const Job& j) const;
bool operator>=(const Job& j) const;
static const char* toString (JobType);
protected:
JobType mType;
uint64 mJobIndex;
FUNCTION_TYPE<void(Job&)> mJob;
LoadEvent::pointer mLoadMonitor;
std::string mName;
};
class JobQueue
{
public:
explicit JobQueue (boost::asio::io_service&);
void addJob(JobType type, const std::string& name, const FUNCTION_TYPE<void(Job&)>& job);
int getJobCount(JobType t); // Jobs waiting at this priority
int getJobCountTotal(JobType t); // Jobs waiting plus running at this priority
int getJobCountGE(JobType t); // All waiting jobs at or greater than this priority
std::vector< std::pair<JobType, std::pair<int, int> > > getJobCounts(); // jobs waiting, threads doing
void shutdown();
void setThreadCount(int c = 0);
LoadEvent::pointer getLoadEvent(JobType t, const std::string& name)
{
return boost::make_shared<LoadEvent>(boost::ref(mJobLoads[t]), name, true);
}
LoadEvent::autoptr getLoadEventAP(JobType t, const std::string& name)
{
return LoadEvent::autoptr(new LoadEvent(mJobLoads[t], name, true));
}
int isOverloaded();
Json::Value getJson(int c = 0);
private:
void threadEntry();
void IOThread(boost::mutex::scoped_lock&);
boost::mutex mJobLock;
boost::condition_variable mJobCond;
uint64 mLastJob;
std::set <Job> mJobSet;
LoadMonitor mJobLoads [NUM_JOB_TYPES];
int mThreadCount;
bool mShuttingDown;
int mIOThreadCount;
int mMaxIOThreadCount;
boost::asio::io_service& mIOService;
std::map<JobType, std::pair<int, int > > mJobCounts;
};
#endif

View File

@@ -4,7 +4,6 @@
#include <boost/thread.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include "Config.h"
#include "Application.h"
SETUP_LOG (LoadManager)

View File

@@ -7,7 +7,6 @@
#include "Version.h"
#include "Peer.h"
#include "Config.h"
#include "Application.h"
#include "SerializedTransaction.h"

View File

@@ -7,7 +7,6 @@
#include <boost/mem_fn.hpp>
#include "Application.h"
#include "Config.h"
SETUP_LOG (PeerDoor)

View File

@@ -1,6 +1,5 @@
#include "RPCDoor.h"
#include "Application.h"
#include "Config.h"
#include <boost/bind.hpp>
#include <iostream>

View File

@@ -6,8 +6,6 @@
#include <openssl/rand.h>
#include "Config.h"
SETUP_LOG (SNTPClient)
// #define SNTP_DEBUG

View File

@@ -5,11 +5,9 @@
//#include "../websocketpp/src/websocketpp.hpp"
#include "Application.h"
#include "Config.h"
#include "NetworkOPs.h"
#include "WSConnection.h"
#include "WSHandler.h"
#include "Config.h"
#include "WSDoor.h"

View File

@@ -2,7 +2,6 @@
#define __WSHANDLER__
#include "Application.h"
#include "Config.h"
extern void initSSLContext(boost::asio::ssl::context& context,
std::string key_file, std::string cert_file, std::string chain_file);

View File

@@ -8,7 +8,6 @@
#include "Application.h"
#include "CallRPC.h"
#include "Config.h"
#include "RPCHandler.h"
namespace po = boost::program_options;

View File

@@ -1,16 +1,6 @@
//
// TODO: Check permissions on config file before using it.
//
#include <algorithm>
#include <fstream>
#include <iostream>
#include <boost/algorithm/string.hpp>
#include <boost/foreach.hpp>
#include <boost/lexical_cast.hpp>
#include "Config.h"
#include "HashPrefixes.h"
// VFALCO: TODO Rename and replace these macros with variables.
#define SECTION_ACCOUNT_PROBE_MAX "account_probe_max"

View File

@@ -1,20 +1,17 @@
#ifndef __CONFIG__
#define __CONFIG__
#ifndef RIPPLE_CONFIG_H
#define RIPPLE_CONFIG_H
#include <string>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/filesystem.hpp>
// VFALCO: NOTE, Set this to 1 to enable code which is unnecessary
#define ENABLE_INSECURE 0
#include "ParseSection.h"
#define ENABLE_INSECURE 0 // 1, to enable unnecessary features.
// VFALCO: TODO, replace all these macros with language constructs
#define SYSTEM_NAME "ripple"
#define SYSTEM_CURRENCY_CODE "XRP"
#define SYSTEM_CURRENCY_PRECISION 6
#define SYSTEM_CURRENCY_CODE_RIPPLE "XRR"
// VFALCO: TODO Replace these with beast "unsigned long long" generators
#define SYSTEM_CURRENCY_GIFT 1000ull
#define SYSTEM_CURRENCY_USERS 100000000ull
#define SYSTEM_CURRENCY_PARTS 1000000ull // 10^SYSTEM_CURRENCY_PRECISION
@@ -194,11 +191,12 @@ public:
std::string SMS_TO;
std::string SMS_URL;
Config();
public:
Config ();
int getSize(SizedItemName);
void setup(const std::string& strConf, bool bTestNet, bool bQuiet);
void load();
int getSize (SizedItemName);
void setup (const std::string& strConf, bool bTestNet, bool bQuiet);
void load ();
};
extern Config theConfig;

View File

@@ -0,0 +1,122 @@
Job::Job ()
: mType (jtINVALID)
, mJobIndex (0)
{
}
Job::Job (JobType type, uint64 index)
: mType (type)
, mJobIndex (index)
{
}
Job::Job (JobType type,
std::string const& name,
uint64 index,
LoadMonitor& lm,
FUNCTION_TYPE <void (Job&)> const& job)
: mType (type)
, mJobIndex (index)
, mJob (job)
, mName (name)
{
// VFALCO: NOTE, what the heck does this do?
mLoadMonitor = boost::make_shared <LoadEvent> (boost::ref (lm), name, false);
}
JobType Job::getType() const
{
return mType;
}
void Job::doJob ()
{
mLoadMonitor->start();
mJob (*this);
mLoadMonitor->reName(mName);
}
void Job::rename (std::string const& newName)
{
mName = newName;
}
const char* Job::toString (JobType t)
{
switch(t)
{
case jtINVALID: return "invalid";
case jtPACK: return "makeFetchPack";
case jtPUBOLDLEDGER: return "publishAcqLedger";
case jtVALIDATION_ut: return "untrustedValidation";
case jtPROOFWORK: return "proofOfWork";
case jtPROPOSAL_ut: return "untrustedProposal";
case jtLEDGER_DATA: return "ledgerData";
case jtUPDATE_PF: return "updatePaths";
case jtCLIENT: return "clientCommand";
case jtTRANSACTION: return "transaction";
case jtPUBLEDGER: return "publishNewLedger";
case jtVALIDATION_t: return "trustedValidation";
case jtWAL: return "writeAhead";
case jtWRITE: return "writeObjects";
case jtTRANSACTION_l: return "localTransaction";
case jtPROPOSAL_t: return "trustedProposal";
case jtADMIN: return "administration";
case jtDEATH: return "jobOfDeath";
case jtPEER: return "peerCommand";
case jtDISK: return "diskAccess";
case jtACCEPTLEDGER: return "acceptLedger";
case jtTXN_PROC: return "processTransaction";
case jtOB_SETUP: return "orderBookSetup";
case jtPATH_FIND: return "pathFind";
case jtHO_READ: return "nodeRead";
case jtHO_WRITE: return "nodeWrite";
default: assert(false); return "unknown";
}
}
bool Job::operator> (const Job& j) const
{
if (mType < j.mType)
return true;
if (mType > j.mType)
return false;
return mJobIndex > j.mJobIndex;
}
bool Job::operator>= (const Job& j) const
{
if (mType < j.mType)
return true;
if (mType > j.mType)
return false;
return mJobIndex >= j.mJobIndex;
}
bool Job::operator< (const Job& j) const
{
if (mType < j.mType)
return false;
if (mType > j.mType)
return true;
return mJobIndex < j.mJobIndex;
}
bool Job::operator<= (const Job& j) const
{
if (mType < j.mType)
return false;
if (mType > j.mType)
return true;
return mJobIndex <= j.mJobIndex;
}

View File

@@ -0,0 +1,81 @@
#ifndef RIPPLE_JOB_H
#define RIPPLE_JOB_H
// Note that this queue should only be used for CPU-bound jobs
// It is primarily intended for signature checking
enum JobType
{
// must be in priority order, low to high
jtINVALID = -1,
jtPACK = 1, // Make a fetch pack for a peer
jtPUBOLDLEDGER = 2, // An old ledger has been accepted
jtVALIDATION_ut = 3, // A validation from an untrusted source
jtPROOFWORK = 4, // A proof of work demand from another server
jtPROPOSAL_ut = 5, // A proposal from an untrusted source
jtLEDGER_DATA = 6, // Received data for a ledger we're acquiring
jtUPDATE_PF = 7, // Update pathfinding requests
jtCLIENT = 8, // A websocket command from the client
jtTRANSACTION = 9, // A transaction received from the network
jtPUBLEDGER = 10, // Publish a fully-accepted ledger
jtWAL = 11, // Write-ahead logging
jtVALIDATION_t = 12, // A validation from a trusted source
jtWRITE = 13, // Write out hashed objects
jtTRANSACTION_l = 14, // A local transaction
jtPROPOSAL_t = 15, // A proposal from a trusted source
jtADMIN = 16, // An administrative operation
jtDEATH = 17, // job of death, used internally
// special types not dispatched by the job pool
jtPEER = 24,
jtDISK = 25,
jtACCEPTLEDGER = 26,
jtTXN_PROC = 27,
jtOB_SETUP = 28,
jtPATH_FIND = 29,
jtHO_READ = 30,
jtHO_WRITE = 31,
}; // CAUTION: If you add new types, add them to JobType.cpp too
// VFALCO: TODO, move this into the enum so it calculates itself?
#define NUM_JOB_TYPES 48 // why 48 and not 32?
class Job
{
public:
// VFALCO: TODO, find out why these extra constructors are needed
Job();
Job (JobType type, uint64 index);
// VFALCO: TODO, try to remove the dependency on LoadMonitor.
Job (JobType type,
std::string const& name,
uint64 index,
LoadMonitor& lm,
FUNCTION_TYPE <void (Job&)> const& job);
JobType getType() const;
void doJob ();
void rename (const std::string& n);
// These comparison operators make the jobs sort in priority order in the job set
bool operator< (const Job& j) const;
bool operator> (const Job& j) const;
bool operator<= (const Job& j) const;
bool operator>= (const Job& j) const;
static const char* toString (JobType);
private:
JobType mType;
uint64 mJobIndex;
FUNCTION_TYPE <void (Job&)> mJob;
// VFALCO: TODO, why is this called mLoadMonitor if the type is LoadEvent pointer?
LoadEvent::pointer mLoadMonitor;
std::string mName;
};
#endif

View File

@@ -3,8 +3,6 @@
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include "Config.h"
SETUP_LOG (JobQueue)
JobQueue::JobQueue(boost::asio::io_service& svc)
@@ -28,78 +26,6 @@ JobQueue::JobQueue(boost::asio::io_service& svc)
mJobLoads[jtACCEPTLEDGER].setTargetLatency(1000, 2500);
}
const char* Job::toString(JobType t)
{
switch(t)
{
case jtINVALID: return "invalid";
case jtPACK: return "makeFetchPack";
case jtPUBOLDLEDGER: return "publishAcqLedger";
case jtVALIDATION_ut: return "untrustedValidation";
case jtPROOFWORK: return "proofOfWork";
case jtPROPOSAL_ut: return "untrustedProposal";
case jtLEDGER_DATA: return "ledgerData";
case jtUPDATE_PF: return "updatePaths";
case jtCLIENT: return "clientCommand";
case jtTRANSACTION: return "transaction";
case jtPUBLEDGER: return "publishNewLedger";
case jtVALIDATION_t: return "trustedValidation";
case jtWAL: return "writeAhead";
case jtWRITE: return "writeObjects";
case jtTRANSACTION_l: return "localTransaction";
case jtPROPOSAL_t: return "trustedProposal";
case jtADMIN: return "administration";
case jtDEATH: return "jobOfDeath";
case jtPEER: return "peerCommand";
case jtDISK: return "diskAccess";
case jtACCEPTLEDGER: return "acceptLedger";
case jtTXN_PROC: return "processTransaction";
case jtOB_SETUP: return "orderBookSetup";
case jtPATH_FIND: return "pathFind";
case jtHO_READ: return "nodeRead";
case jtHO_WRITE: return "nodeWrite";
default: assert(false); return "unknown";
}
}
bool Job::operator>(const Job& j) const
{ // These comparison operators make the jobs sort in priority order in the job set
if (mType < j.mType)
return true;
if (mType > j.mType)
return false;
return mJobIndex > j.mJobIndex;
}
bool Job::operator>=(const Job& j) const
{
if (mType < j.mType)
return true;
if (mType > j.mType)
return false;
return mJobIndex >= j.mJobIndex;
}
bool Job::operator<(const Job& j) const
{
if (mType < j.mType)
return false;
if (mType > j.mType)
return true;
return mJobIndex < j.mJobIndex;
}
bool Job::operator<=(const Job& j) const
{
if (mType < j.mType)
return false;
if (mType > j.mType)
return true;
return mJobIndex <= j.mJobIndex;
}
void JobQueue::addJob(JobType type, const std::string& name, const FUNCTION_TYPE<void(Job&)>& jobFunc)
{
assert(type != jtINVALID);

View File

@@ -0,0 +1,52 @@
#ifndef RIPPLE_JOBQUEUE_H
#define RIPPLE_JOBQUEUE_H
class JobQueue
{
public:
explicit JobQueue (boost::asio::io_service&);
void addJob(JobType type, const std::string& name, const FUNCTION_TYPE<void(Job&)>& job);
int getJobCount(JobType t); // Jobs waiting at this priority
int getJobCountTotal(JobType t); // Jobs waiting plus running at this priority
int getJobCountGE(JobType t); // All waiting jobs at or greater than this priority
std::vector< std::pair<JobType, std::pair<int, int> > > getJobCounts(); // jobs waiting, threads doing
void shutdown();
void setThreadCount(int c = 0);
LoadEvent::pointer getLoadEvent(JobType t, const std::string& name)
{
return boost::make_shared<LoadEvent>(boost::ref(mJobLoads[t]), name, true);
}
LoadEvent::autoptr getLoadEventAP(JobType t, const std::string& name)
{
return LoadEvent::autoptr(new LoadEvent(mJobLoads[t], name, true));
}
int isOverloaded();
Json::Value getJson(int c = 0);
private:
void threadEntry();
void IOThread(boost::mutex::scoped_lock&);
boost::mutex mJobLock;
boost::condition_variable mJobCond;
uint64 mLastJob;
std::set <Job> mJobSet;
LoadMonitor mJobLoads [NUM_JOB_TYPES];
int mThreadCount;
bool mShuttingDown;
int mIOThreadCount;
int mMaxIOThreadCount;
boost::asio::io_service& mIOService;
std::map<JobType, std::pair<int, int > > mJobCounts;
};
#endif

View File

@@ -1,7 +1,6 @@
//#include <boost/test/unit_test.hpp>
//#include <boost/thread.hpp>
//#include <boost/date_time/posix_time/posix_time.hpp>
//#include "Config.h"
//#include "Application.h"
class LoadFeeTrack : public ILoadFeeTrack

View File

@@ -9,7 +9,6 @@
#include <openssl/evp.h>
#include "RPC.h"
#include "Config.h"
#include "Version.h"
// Used for logging