Replace some boost::thread with beast::thread

This commit is contained in:
Vinnie Falco
2013-07-29 01:39:09 -07:00
parent ae4071d1e0
commit e9a7f6f81a
9 changed files with 300 additions and 348 deletions

View File

@@ -40,6 +40,10 @@ David Feature:
--------------------------------------------------------------------------------
- Consolidate databases
- Figure out why we need WAL sqlite mode if we no longer use sqlite for the node store
- Add "skipped" field to beginTestCase() to disable a test but still record
that it was skipped in the output. Like for mdb import.

View File

@@ -33,8 +33,11 @@ SqliteStatement::~SqliteStatement ()
sqlite3_finalize (statement);
}
//------------------------------------------------------------------------------
SqliteDatabase::SqliteDatabase (const char* host)
: Database (host)
, m_thread ("sqlitedb")
, mWalQ (NULL)
, walRunning (false)
{
@@ -290,9 +293,13 @@ void SqliteDatabase::doHook (const char* db, int pages)
}
if (mWalQ)
{
mWalQ->addJob (jtWAL, std::string ("WAL:") + mHost, BIND_TYPE (&SqliteDatabase::runWal, this));
}
else
boost::thread (BIND_TYPE (&SqliteDatabase::runWal, this)).detach ();
{
m_thread.call (&SqliteDatabase::runWal, this);
}
}
void SqliteDatabase::runWal ()

View File

@@ -57,6 +57,8 @@ public:
int getKBUsedAll ();
private:
ThreadWithCallQueue m_thread;
sqlite3* mConnection;
// VFALCO TODO Why do we need an "aux" connection? Should just use a second SqliteDatabase object.
sqlite3* mAuxConnection;

View File

@@ -44,6 +44,13 @@ public:
// the dtor of m_threads will block until each thread exits.
}
// TEMPORARY HACK for compatibility with old code
void runExtraThreads ()
{
for (int i = 0; i < m_threads.size (); ++i)
m_threads [i]->start ();
}
// Run on the callers thread.
// This will block until stop is issued.
void run ()
@@ -92,7 +99,7 @@ public:
: Thread (name)
, m_service (service)
{
startThread ();
//startThread ();
}
~ServiceThread ()
@@ -102,6 +109,11 @@ public:
stopThread (-1); // wait forever
}
void start ()
{
startThread ();
}
void run ()
{
m_service.run ();
@@ -149,7 +161,6 @@ public:
, mTempNodeCache ("NodeCache", 16384, 90)
, mSLECache ("LedgerEntryCache", 4096, 120)
, mSNTPClient (m_auxService)
, mJobQueue (m_mainService)
// VFALCO New stuff
, m_nodeStore (NodeStore::New (
getConfig ().nodeDatabase,
@@ -360,7 +371,266 @@ public:
{
return mShutdown;
}
void setup ();
//--------------------------------------------------------------------------
static DatabaseCon* openDatabaseCon (const char* fileName,
const char* dbInit[],
int dbCount)
{
return new DatabaseCon (fileName, dbInit, dbCount);
}
void initSqliteDb (int index)
{
switch (index)
{
case 0: mRpcDB = openDatabaseCon ("rpc.db", RpcDBInit, RpcDBCount); break;
case 1: mTxnDB = openDatabaseCon ("transaction.db", TxnDBInit, TxnDBCount); break;
case 2: mLedgerDB = openDatabaseCon ("ledger.db", LedgerDBInit, LedgerDBCount); break;
case 3: mWalletDB = openDatabaseCon ("wallet.db", WalletDBInit, WalletDBCount); break;
};
}
// VFALCO TODO Is it really necessary to init the dbs in parallel?
void initSqliteDbs ()
{
int const count = 4;
ThreadGroup threadGroup (count);
ParallelFor (threadGroup).loop (count, &ApplicationImp::initSqliteDb, this);
}
#ifdef SIGINT
static void sigIntHandler (int)
{
doShutdown = true;
}
#endif
// VFALCO TODO Break this function up into many small initialization segments.
// Or better yet refactor these initializations into RAII classes
// which are members of the Application object.
//
void setup ()
{
// VFALCO NOTE: 0 means use heuristics to determine the thread count.
mJobQueue.setThreadCount (0, getConfig ().RUN_STANDALONE);
mSweepTimer.expires_from_now (boost::posix_time::seconds (10));
mSweepTimer.async_wait (BIND_TYPE (&ApplicationImp::sweep, this));
m_loadManager->startThread ();
#if ! BEAST_WIN32
#ifdef SIGINT
if (!getConfig ().RUN_STANDALONE)
{
struct sigaction sa;
memset (&sa, 0, sizeof (sa));
sa.sa_handler = &ApplicationImp::sigIntHandler;
sigaction (SIGINT, &sa, NULL);
}
#endif
#endif
assert (mTxnDB == NULL);
if (!getConfig ().DEBUG_LOGFILE.empty ())
{
// Let debug messages go to the file but only WARNING or higher to regular output (unless verbose)
Log::setLogFile (getConfig ().DEBUG_LOGFILE);
if (Log::getMinSeverity () > lsDEBUG)
LogPartition::setSeverity (lsDEBUG);
}
if (!getConfig ().RUN_STANDALONE)
mSNTPClient.init (getConfig ().SNTP_SERVERS);
initSqliteDbs ();
leveldb::Options options;
options.create_if_missing = true;
options.block_cache = leveldb::NewLRUCache (getConfig ().getSize (siHashNodeDBCache) * 1024 * 1024);
getApp().getLedgerDB ()->getDB ()->executeSQL (boost::str (boost::format ("PRAGMA cache_size=-%d;") %
(getConfig ().getSize (siLgrDBCache) * 1024)));
getApp().getTxnDB ()->getDB ()->executeSQL (boost::str (boost::format ("PRAGMA cache_size=-%d;") %
(getConfig ().getSize (siTxnDBCache) * 1024)));
mTxnDB->getDB ()->setupCheckpointing (&mJobQueue);
mLedgerDB->getDB ()->setupCheckpointing (&mJobQueue);
if (!getConfig ().RUN_STANDALONE)
updateTables ();
mFeatures->addInitialFeatures ();
if (getConfig ().START_UP == Config::FRESH)
{
WriteLog (lsINFO, Application) << "Starting new Ledger";
startNewLedger ();
}
else if ((getConfig ().START_UP == Config::LOAD) || (getConfig ().START_UP == Config::REPLAY))
{
WriteLog (lsINFO, Application) << "Loading specified Ledger";
if (!loadOldLedger (getConfig ().START_LEDGER, getConfig ().START_UP == Config::REPLAY))
{
getApp().stop ();
exit (-1);
}
}
else if (getConfig ().START_UP == Config::NETWORK)
{
// This should probably become the default once we have a stable network
if (!getConfig ().RUN_STANDALONE)
mNetOps->needNetworkLedger ();
startNewLedger ();
}
else
startNewLedger ();
mOrderBookDB.setup (getApp().getLedgerMaster ().getCurrentLedger ());
//
// Begin validation and ip maintenance.
// - LocalCredentials maintains local information: including identity and network connection persistence information.
//
m_localCredentials.start ();
//
// Set up UNL.
//
if (!getConfig ().RUN_STANDALONE)
getUNL ().nodeBootstrap ();
mValidations->tune (getConfig ().getSize (siValidationsSize), getConfig ().getSize (siValidationsAge));
m_nodeStore->tune (getConfig ().getSize (siNodeCacheSize), getConfig ().getSize (siNodeCacheAge));
mLedgerMaster.tune (getConfig ().getSize (siLedgerSize), getConfig ().getSize (siLedgerAge));
mSLECache.setTargetSize (getConfig ().getSize (siSLECacheSize));
mSLECache.setTargetAge (getConfig ().getSize (siSLECacheAge));
mLedgerMaster.setMinValidations (getConfig ().VALIDATION_QUORUM);
//
// Allow peer connections.
//
if (!getConfig ().RUN_STANDALONE)
{
try
{
mPeerDoor = PeerDoor::New (
getConfig ().PEER_IP,
getConfig ().PEER_PORT,
getConfig ().PEER_SSL_CIPHER_LIST,
m_mainService);
}
catch (const std::exception& e)
{
// Must run as directed or exit.
WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open peer service: %s") % e.what ());
exit (3);
}
}
else
{
WriteLog (lsINFO, Application) << "Peer interface: disabled";
}
//
// Allow RPC connections.
//
if (! getConfig ().getRpcIP().empty () && getConfig ().getRpcPort() != 0)
{
try
{
mRPCDoor = new RPCDoor (m_mainService, m_rpcServerHandler);
}
catch (const std::exception& e)
{
// Must run as directed or exit.
WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open RPC service: %s") % e.what ());
exit (3);
}
}
else
{
WriteLog (lsINFO, Application) << "RPC interface: disabled";
}
//
// Allow private WS connections.
//
if (!getConfig ().WEBSOCKET_IP.empty () && getConfig ().WEBSOCKET_PORT)
{
try
{
mWSPrivateDoor = new WSDoor (getConfig ().WEBSOCKET_IP, getConfig ().WEBSOCKET_PORT, false);
}
catch (const std::exception& e)
{
// Must run as directed or exit.
WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open private websocket service: %s") % e.what ());
exit (3);
}
}
else
{
WriteLog (lsINFO, Application) << "WS private interface: disabled";
}
//
// Allow public WS connections.
//
if (!getConfig ().WEBSOCKET_PUBLIC_IP.empty () && getConfig ().WEBSOCKET_PUBLIC_PORT)
{
try
{
mWSPublicDoor = new WSDoor (getConfig ().WEBSOCKET_PUBLIC_IP, getConfig ().WEBSOCKET_PUBLIC_PORT, true);
}
catch (const std::exception& e)
{
// Must run as directed or exit.
WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open public websocket service: %s") % e.what ());
exit (3);
}
}
else
{
WriteLog (lsINFO, Application) << "WS public interface: disabled";
}
//
// Begin connecting to network.
//
if (!getConfig ().RUN_STANDALONE)
mPeers->start ();
if (getConfig ().RUN_STANDALONE)
{
WriteLog (lsWARNING, Application) << "Running in standalone mode";
mNetOps->setStandAlone ();
}
else
{
// VFALCO NOTE the state timer resets the deadlock detector.
//
mNetOps->setStateTimer ();
}
}
void run ();
void stop ();
void sweep ();
@@ -442,265 +712,14 @@ void ApplicationImp::stop ()
mShutdown = false;
}
static void InitDB (DatabaseCon** dbCon, const char* fileName, const char* dbInit[], int dbCount)
{
*dbCon = new DatabaseCon (fileName, dbInit, dbCount);
}
#ifdef SIGINT
void sigIntHandler (int)
{
doShutdown = true;
}
#endif
// VFALCO TODO Figure this out it looks like the wrong tool
static void runAux (boost::asio::io_service& svc)
{
setCallingThreadName ("aux");
svc.run ();
}
static void runIO (boost::asio::io_service& io)
{
setCallingThreadName ("io");
io.run ();
}
// VFALCO TODO Break this function up into many small initialization segments.
// Or better yet refactor these initializations into RAII classes
// which are members of the Application object.
//
void ApplicationImp::setup ()
{
// VFALCO NOTE: 0 means use heuristics to determine the thread count.
mJobQueue.setThreadCount (0, getConfig ().RUN_STANDALONE);
mSweepTimer.expires_from_now (boost::posix_time::seconds (10));
mSweepTimer.async_wait (BIND_TYPE (&ApplicationImp::sweep, this));
m_loadManager->startThread ();
#if ! BEAST_WIN32
#ifdef SIGINT
if (!getConfig ().RUN_STANDALONE)
{
struct sigaction sa;
memset (&sa, 0, sizeof (sa));
sa.sa_handler = sigIntHandler;
sigaction (SIGINT, &sa, NULL);
}
#endif
#endif
assert (mTxnDB == NULL);
if (!getConfig ().DEBUG_LOGFILE.empty ())
{
// Let debug messages go to the file but only WARNING or higher to regular output (unless verbose)
Log::setLogFile (getConfig ().DEBUG_LOGFILE);
if (Log::getMinSeverity () > lsDEBUG)
LogPartition::setSeverity (lsDEBUG);
}
if (!getConfig ().RUN_STANDALONE)
mSNTPClient.init (getConfig ().SNTP_SERVERS);
//
// Construct databases.
//
boost::thread t1 (BIND_TYPE (&InitDB, &mRpcDB, "rpc.db", RpcDBInit, RpcDBCount));
boost::thread t2 (BIND_TYPE (&InitDB, &mTxnDB, "transaction.db", TxnDBInit, TxnDBCount));
boost::thread t3 (BIND_TYPE (&InitDB, &mLedgerDB, "ledger.db", LedgerDBInit, LedgerDBCount));
boost::thread t4 (BIND_TYPE (&InitDB, &mWalletDB, "wallet.db", WalletDBInit, WalletDBCount));
t1.join ();
t2.join ();
t3.join ();
t4.join ();
leveldb::Options options;
options.create_if_missing = true;
options.block_cache = leveldb::NewLRUCache (getConfig ().getSize (siHashNodeDBCache) * 1024 * 1024);
getApp().getLedgerDB ()->getDB ()->executeSQL (boost::str (boost::format ("PRAGMA cache_size=-%d;") %
(getConfig ().getSize (siLgrDBCache) * 1024)));
getApp().getTxnDB ()->getDB ()->executeSQL (boost::str (boost::format ("PRAGMA cache_size=-%d;") %
(getConfig ().getSize (siTxnDBCache) * 1024)));
mTxnDB->getDB ()->setupCheckpointing (&mJobQueue);
mLedgerDB->getDB ()->setupCheckpointing (&mJobQueue);
if (!getConfig ().RUN_STANDALONE)
updateTables ();
mFeatures->addInitialFeatures ();
if (getConfig ().START_UP == Config::FRESH)
{
WriteLog (lsINFO, Application) << "Starting new Ledger";
startNewLedger ();
}
else if ((getConfig ().START_UP == Config::LOAD) || (getConfig ().START_UP == Config::REPLAY))
{
WriteLog (lsINFO, Application) << "Loading specified Ledger";
if (!loadOldLedger (getConfig ().START_LEDGER, getConfig ().START_UP == Config::REPLAY))
{
getApp().stop ();
exit (-1);
}
}
else if (getConfig ().START_UP == Config::NETWORK)
{
// This should probably become the default once we have a stable network
if (!getConfig ().RUN_STANDALONE)
mNetOps->needNetworkLedger ();
startNewLedger ();
}
else
startNewLedger ();
mOrderBookDB.setup (getApp().getLedgerMaster ().getCurrentLedger ());
//
// Begin validation and ip maintenance.
// - LocalCredentials maintains local information: including identity and network connection persistence information.
//
m_localCredentials.start ();
//
// Set up UNL.
//
if (!getConfig ().RUN_STANDALONE)
getUNL ().nodeBootstrap ();
mValidations->tune (getConfig ().getSize (siValidationsSize), getConfig ().getSize (siValidationsAge));
m_nodeStore->tune (getConfig ().getSize (siNodeCacheSize), getConfig ().getSize (siNodeCacheAge));
mLedgerMaster.tune (getConfig ().getSize (siLedgerSize), getConfig ().getSize (siLedgerAge));
mSLECache.setTargetSize (getConfig ().getSize (siSLECacheSize));
mSLECache.setTargetAge (getConfig ().getSize (siSLECacheAge));
mLedgerMaster.setMinValidations (getConfig ().VALIDATION_QUORUM);
//
// Allow peer connections.
//
if (!getConfig ().RUN_STANDALONE)
{
try
{
mPeerDoor = PeerDoor::New (
getConfig ().PEER_IP,
getConfig ().PEER_PORT,
getConfig ().PEER_SSL_CIPHER_LIST,
m_mainService);
}
catch (const std::exception& e)
{
// Must run as directed or exit.
WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open peer service: %s") % e.what ());
exit (3);
}
}
else
{
WriteLog (lsINFO, Application) << "Peer interface: disabled";
}
//
// Allow RPC connections.
//
if (! getConfig ().getRpcIP().empty () && getConfig ().getRpcPort() != 0)
{
try
{
mRPCDoor = new RPCDoor (m_mainService, m_rpcServerHandler);
}
catch (const std::exception& e)
{
// Must run as directed or exit.
WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open RPC service: %s") % e.what ());
exit (3);
}
}
else
{
WriteLog (lsINFO, Application) << "RPC interface: disabled";
}
//
// Allow private WS connections.
//
if (!getConfig ().WEBSOCKET_IP.empty () && getConfig ().WEBSOCKET_PORT)
{
try
{
mWSPrivateDoor = WSDoor::createWSDoor (getConfig ().WEBSOCKET_IP, getConfig ().WEBSOCKET_PORT, false);
}
catch (const std::exception& e)
{
// Must run as directed or exit.
WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open private websocket service: %s") % e.what ());
exit (3);
}
}
else
{
WriteLog (lsINFO, Application) << "WS private interface: disabled";
}
//
// Allow public WS connections.
//
if (!getConfig ().WEBSOCKET_PUBLIC_IP.empty () && getConfig ().WEBSOCKET_PUBLIC_PORT)
{
try
{
mWSPublicDoor = WSDoor::createWSDoor (getConfig ().WEBSOCKET_PUBLIC_IP, getConfig ().WEBSOCKET_PUBLIC_PORT, true);
}
catch (const std::exception& e)
{
// Must run as directed or exit.
WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open public websocket service: %s") % e.what ());
exit (3);
}
}
else
{
WriteLog (lsINFO, Application) << "WS public interface: disabled";
}
//
// Begin connecting to network.
//
if (!getConfig ().RUN_STANDALONE)
mPeers->start ();
if (getConfig ().RUN_STANDALONE)
{
WriteLog (lsWARNING, Application) << "Running in standalone mode";
mNetOps->setStandAlone ();
}
else
{
// VFALCO NOTE the state timer resets the deadlock detector.
//
mNetOps->setStateTimer ();
}
}
void ApplicationImp::run ()
{
// VFALCO TODO The unit tests crash if we try to
// run these threads in the IoService constructor
// so this hack makes them start later.
//
m_mainService.runExtraThreads ();
if (!getConfig ().RUN_STANDALONE)
{
// VFALCO NOTE This seems unnecessary. If we properly refactor the load

View File

@@ -28,6 +28,7 @@ WSDoor::WSDoor (std::string const& strIp, int iPort, bool bPublic)
, mIp (strIp)
, mPort (iPort)
{
startThread ();
}
WSDoor::~WSDoor ()
@@ -39,8 +40,8 @@ WSDoor::~WSDoor ()
m_endpoint->stop ();
}
m_thread.signalThreadShouldExit ();
m_thread.waitForThreadToExit ();
signalThreadShouldExit ();
waitForThreadToExit ();
}
void WSDoor::run ()
@@ -110,6 +111,6 @@ void WSDoor::stop ()
m_endpoint->stop ();
}
m_thread.signalThreadShouldExit ();
m_thread.waitForThreadToExit ();
signalThreadShouldExit ();
waitForThreadToExit ();
}

View File

@@ -20,7 +20,7 @@ private:
void run ();
private:
ScopedPointer <websocketpp::server_autotls*> m_endpoint;
ScopedPointer <websocketpp::server_autotls> m_endpoint;
CriticalSection m_endpointLock;
bool mPublic;
std::string mIp;

View File

@@ -60,8 +60,6 @@
#include <boost/unordered_set.hpp>
#include <boost/weak_ptr.hpp>
//#include "../ripple_sqlite/ripple_sqlite.h" // for SqliteDatabase.cpp
#include "../ripple_core/ripple_core.h"
#include "beast/modules/beast_db/beast_db.h"

View File

@@ -195,7 +195,7 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode)
}
else if (c == 0)
{
c = boost::thread::hardware_concurrency ();
c = SystemStats::getNumCpus ();
// VFALCO NOTE According to boost, hardware_concurrency cannot return
// negative numbers/

View File

@@ -1,79 +0,0 @@
.\"Modified from man(1) of FreeBSD, the NetBSD mdoc.template, and mdoc.samples.
.\"See Also:
.\"man mdoc.samples for a complete listing of options
.\"man mdoc for the short list of editing options
.\"/usr/share/misc/mdoc.template
.Dd 12/18/12 \" DATE
.Dt rippled 1 \" Program name and manual section number
.Os Darwin
.Sh NAME \" Section Header - required - don't modify
.Nm rippled,
.\" The following lines are read in generating the apropos(man -k) database. Use only key
.\" words here as the database is built based on the words here and in the .ND line.
.Nm Other_name_for_same_program(),
.Nm Yet another name for the same program.
.\" Use .Nm macro to designate other names for the documented program.
.Nd This line parsed for whatis database.
.Sh SYNOPSIS \" Section Header - required - don't modify
.Nm
.Op Fl abcd \" [-abcd]
.Op Fl a Ar path \" [-a path]
.Op Ar file \" [file]
.Op Ar \" [file ...]
.Ar arg0 \" Underlined argument - use .Ar anywhere to underline
arg2 ... \" Arguments
.Sh DESCRIPTION \" Section Header - required - don't modify
Use the .Nm macro to refer to your program throughout the man page like such:
.Nm
Underlining is accomplished with the .Ar macro like this:
.Ar underlined text .
.Pp \" Inserts a space
A list of items with descriptions:
.Bl -tag -width -indent \" Begins a tagged list
.It item a \" Each item preceded by .It macro
Description of item a
.It item b
Description of item b
.El \" Ends the list
.Pp
A list of flags and their descriptions:
.Bl -tag -width -indent \" Differs from above in tag removed
.It Fl a \"-a flag as a list item
Description of -a flag
.It Fl b
Description of -b flag
.El \" Ends the list
.Pp
.\" .Sh ENVIRONMENT \" May not be needed
.\" .Bl -tag -width "ENV_VAR_1" -indent \" ENV_VAR_1 is width of the string ENV_VAR_1
.\" .It Ev ENV_VAR_1
.\" Description of ENV_VAR_1
.\" .It Ev ENV_VAR_2
.\" Description of ENV_VAR_2
.\" .El
.Sh FILES \" File used or created by the topic of the man page
.Bl -tag -width "/Users/joeuser/Library/really_long_file_name" -compact
.It Pa /usr/share/file_name
FILE_1 description
.It Pa /Users/joeuser/Library/really_long_file_name
FILE_2 description
.El \" Ends the list
.\" .Sh DIAGNOSTICS \" May not be needed
.\" .Bl -diag
.\" .It Diagnostic Tag
.\" Diagnostic informtion here.
.\" .It Diagnostic Tag
.\" Diagnostic informtion here.
.\" .El
.Sh SEE ALSO
.\" List links in ascending order by section, alphabetically within a section.
.\" Please do not reference files that do not exist without filing a bug report
.Xr a 1 ,
.Xr b 1 ,
.Xr c 1 ,
.Xr a 2 ,
.Xr b 2 ,
.Xr a 3 ,
.Xr b 3
.\" .Sh BUGS \" Document known, unremedied bugs
.\" .Sh HISTORY \" Document history if command behaves in a unique manner