diff --git a/TODO.txt b/TODO.txt index ec7eeb553d..b138053fdf 100644 --- a/TODO.txt +++ b/TODO.txt @@ -40,6 +40,10 @@ David Feature: -------------------------------------------------------------------------------- +- Consolidate databases + +- Figure out why we need WAL sqlite mode if we no longer use sqlite for the node store + - Add "skipped" field to beginTestCase() to disable a test but still record that it was skipped in the output. Like for mdb import. diff --git a/modules/ripple_app/data/ripple_SqliteDatabase.cpp b/modules/ripple_app/data/ripple_SqliteDatabase.cpp index 2ffcb47e97..e88e40dd6d 100644 --- a/modules/ripple_app/data/ripple_SqliteDatabase.cpp +++ b/modules/ripple_app/data/ripple_SqliteDatabase.cpp @@ -33,8 +33,11 @@ SqliteStatement::~SqliteStatement () sqlite3_finalize (statement); } +//------------------------------------------------------------------------------ + SqliteDatabase::SqliteDatabase (const char* host) : Database (host) + , m_thread ("sqlitedb") , mWalQ (NULL) , walRunning (false) { @@ -290,9 +293,13 @@ void SqliteDatabase::doHook (const char* db, int pages) } if (mWalQ) + { mWalQ->addJob (jtWAL, std::string ("WAL:") + mHost, BIND_TYPE (&SqliteDatabase::runWal, this)); + } else - boost::thread (BIND_TYPE (&SqliteDatabase::runWal, this)).detach (); + { + m_thread.call (&SqliteDatabase::runWal, this); + } } void SqliteDatabase::runWal () diff --git a/modules/ripple_app/data/ripple_SqliteDatabase.h b/modules/ripple_app/data/ripple_SqliteDatabase.h index a1bac93b56..29b4aed71f 100644 --- a/modules/ripple_app/data/ripple_SqliteDatabase.h +++ b/modules/ripple_app/data/ripple_SqliteDatabase.h @@ -57,6 +57,8 @@ public: int getKBUsedAll (); private: + ThreadWithCallQueue m_thread; + sqlite3* mConnection; // VFALCO TODO Why do we need an "aux" connection? Should just use a second SqliteDatabase object. sqlite3* mAuxConnection; diff --git a/modules/ripple_app/main/ripple_Application.cpp b/modules/ripple_app/main/ripple_Application.cpp index 239cb67176..d1ea135c8a 100644 --- a/modules/ripple_app/main/ripple_Application.cpp +++ b/modules/ripple_app/main/ripple_Application.cpp @@ -44,6 +44,13 @@ public: // the dtor of m_threads will block until each thread exits. } + // TEMPORARY HACK for compatibility with old code + void runExtraThreads () + { + for (int i = 0; i < m_threads.size (); ++i) + m_threads [i]->start (); + } + // Run on the callers thread. // This will block until stop is issued. void run () @@ -92,7 +99,7 @@ public: : Thread (name) , m_service (service) { - startThread (); + //startThread (); } ~ServiceThread () @@ -102,6 +109,11 @@ public: stopThread (-1); // wait forever } + void start () + { + startThread (); + } + void run () { m_service.run (); @@ -149,7 +161,6 @@ public: , mTempNodeCache ("NodeCache", 16384, 90) , mSLECache ("LedgerEntryCache", 4096, 120) , mSNTPClient (m_auxService) - , mJobQueue (m_mainService) // VFALCO New stuff , m_nodeStore (NodeStore::New ( getConfig ().nodeDatabase, @@ -360,7 +371,266 @@ public: { return mShutdown; } - void setup (); + + //-------------------------------------------------------------------------- + + static DatabaseCon* openDatabaseCon (const char* fileName, + const char* dbInit[], + int dbCount) + { + return new DatabaseCon (fileName, dbInit, dbCount); + } + + void initSqliteDb (int index) + { + switch (index) + { + case 0: mRpcDB = openDatabaseCon ("rpc.db", RpcDBInit, RpcDBCount); break; + case 1: mTxnDB = openDatabaseCon ("transaction.db", TxnDBInit, TxnDBCount); break; + case 2: mLedgerDB = openDatabaseCon ("ledger.db", LedgerDBInit, LedgerDBCount); break; + case 3: mWalletDB = openDatabaseCon ("wallet.db", WalletDBInit, WalletDBCount); break; + }; + } + + // VFALCO TODO Is it really necessary to init the dbs in parallel? + void initSqliteDbs () + { + int const count = 4; + + ThreadGroup threadGroup (count); + ParallelFor (threadGroup).loop (count, &ApplicationImp::initSqliteDb, this); + } + +#ifdef SIGINT + static void sigIntHandler (int) + { + doShutdown = true; + } +#endif + + // VFALCO TODO Break this function up into many small initialization segments. + // Or better yet refactor these initializations into RAII classes + // which are members of the Application object. + // + void setup () + { + // VFALCO NOTE: 0 means use heuristics to determine the thread count. + mJobQueue.setThreadCount (0, getConfig ().RUN_STANDALONE); + + mSweepTimer.expires_from_now (boost::posix_time::seconds (10)); + mSweepTimer.async_wait (BIND_TYPE (&ApplicationImp::sweep, this)); + + m_loadManager->startThread (); + + #if ! BEAST_WIN32 + #ifdef SIGINT + + if (!getConfig ().RUN_STANDALONE) + { + struct sigaction sa; + memset (&sa, 0, sizeof (sa)); + sa.sa_handler = &ApplicationImp::sigIntHandler; + sigaction (SIGINT, &sa, NULL); + } + + #endif + #endif + + assert (mTxnDB == NULL); + + if (!getConfig ().DEBUG_LOGFILE.empty ()) + { + // Let debug messages go to the file but only WARNING or higher to regular output (unless verbose) + Log::setLogFile (getConfig ().DEBUG_LOGFILE); + + if (Log::getMinSeverity () > lsDEBUG) + LogPartition::setSeverity (lsDEBUG); + } + + if (!getConfig ().RUN_STANDALONE) + mSNTPClient.init (getConfig ().SNTP_SERVERS); + + initSqliteDbs (); + + leveldb::Options options; + options.create_if_missing = true; + options.block_cache = leveldb::NewLRUCache (getConfig ().getSize (siHashNodeDBCache) * 1024 * 1024); + + getApp().getLedgerDB ()->getDB ()->executeSQL (boost::str (boost::format ("PRAGMA cache_size=-%d;") % + (getConfig ().getSize (siLgrDBCache) * 1024))); + getApp().getTxnDB ()->getDB ()->executeSQL (boost::str (boost::format ("PRAGMA cache_size=-%d;") % + (getConfig ().getSize (siTxnDBCache) * 1024))); + + mTxnDB->getDB ()->setupCheckpointing (&mJobQueue); + mLedgerDB->getDB ()->setupCheckpointing (&mJobQueue); + + if (!getConfig ().RUN_STANDALONE) + updateTables (); + + mFeatures->addInitialFeatures (); + + if (getConfig ().START_UP == Config::FRESH) + { + WriteLog (lsINFO, Application) << "Starting new Ledger"; + + startNewLedger (); + } + else if ((getConfig ().START_UP == Config::LOAD) || (getConfig ().START_UP == Config::REPLAY)) + { + WriteLog (lsINFO, Application) << "Loading specified Ledger"; + + if (!loadOldLedger (getConfig ().START_LEDGER, getConfig ().START_UP == Config::REPLAY)) + { + getApp().stop (); + exit (-1); + } + } + else if (getConfig ().START_UP == Config::NETWORK) + { + // This should probably become the default once we have a stable network + if (!getConfig ().RUN_STANDALONE) + mNetOps->needNetworkLedger (); + + startNewLedger (); + } + else + startNewLedger (); + + mOrderBookDB.setup (getApp().getLedgerMaster ().getCurrentLedger ()); + + // + // Begin validation and ip maintenance. + // - LocalCredentials maintains local information: including identity and network connection persistence information. + // + m_localCredentials.start (); + + // + // Set up UNL. + // + if (!getConfig ().RUN_STANDALONE) + getUNL ().nodeBootstrap (); + + mValidations->tune (getConfig ().getSize (siValidationsSize), getConfig ().getSize (siValidationsAge)); + m_nodeStore->tune (getConfig ().getSize (siNodeCacheSize), getConfig ().getSize (siNodeCacheAge)); + mLedgerMaster.tune (getConfig ().getSize (siLedgerSize), getConfig ().getSize (siLedgerAge)); + mSLECache.setTargetSize (getConfig ().getSize (siSLECacheSize)); + mSLECache.setTargetAge (getConfig ().getSize (siSLECacheAge)); + + mLedgerMaster.setMinValidations (getConfig ().VALIDATION_QUORUM); + + // + // Allow peer connections. + // + if (!getConfig ().RUN_STANDALONE) + { + try + { + mPeerDoor = PeerDoor::New ( + getConfig ().PEER_IP, + getConfig ().PEER_PORT, + getConfig ().PEER_SSL_CIPHER_LIST, + m_mainService); + } + catch (const std::exception& e) + { + // Must run as directed or exit. + WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open peer service: %s") % e.what ()); + + exit (3); + } + } + else + { + WriteLog (lsINFO, Application) << "Peer interface: disabled"; + } + + // + // Allow RPC connections. + // + if (! getConfig ().getRpcIP().empty () && getConfig ().getRpcPort() != 0) + { + try + { + mRPCDoor = new RPCDoor (m_mainService, m_rpcServerHandler); + } + catch (const std::exception& e) + { + // Must run as directed or exit. + WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open RPC service: %s") % e.what ()); + + exit (3); + } + } + else + { + WriteLog (lsINFO, Application) << "RPC interface: disabled"; + } + + // + // Allow private WS connections. + // + if (!getConfig ().WEBSOCKET_IP.empty () && getConfig ().WEBSOCKET_PORT) + { + try + { + mWSPrivateDoor = new WSDoor (getConfig ().WEBSOCKET_IP, getConfig ().WEBSOCKET_PORT, false); + } + catch (const std::exception& e) + { + // Must run as directed or exit. + WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open private websocket service: %s") % e.what ()); + + exit (3); + } + } + else + { + WriteLog (lsINFO, Application) << "WS private interface: disabled"; + } + + // + // Allow public WS connections. + // + if (!getConfig ().WEBSOCKET_PUBLIC_IP.empty () && getConfig ().WEBSOCKET_PUBLIC_PORT) + { + try + { + mWSPublicDoor = new WSDoor (getConfig ().WEBSOCKET_PUBLIC_IP, getConfig ().WEBSOCKET_PUBLIC_PORT, true); + } + catch (const std::exception& e) + { + // Must run as directed or exit. + WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open public websocket service: %s") % e.what ()); + + exit (3); + } + } + else + { + WriteLog (lsINFO, Application) << "WS public interface: disabled"; + } + + // + // Begin connecting to network. + // + if (!getConfig ().RUN_STANDALONE) + mPeers->start (); + + if (getConfig ().RUN_STANDALONE) + { + WriteLog (lsWARNING, Application) << "Running in standalone mode"; + + mNetOps->setStandAlone (); + } + else + { + // VFALCO NOTE the state timer resets the deadlock detector. + // + mNetOps->setStateTimer (); + } + } + + void run (); void stop (); void sweep (); @@ -442,265 +712,14 @@ void ApplicationImp::stop () mShutdown = false; } -static void InitDB (DatabaseCon** dbCon, const char* fileName, const char* dbInit[], int dbCount) -{ - *dbCon = new DatabaseCon (fileName, dbInit, dbCount); -} - -#ifdef SIGINT -void sigIntHandler (int) -{ - doShutdown = true; -} -#endif - -// VFALCO TODO Figure this out it looks like the wrong tool -static void runAux (boost::asio::io_service& svc) -{ - setCallingThreadName ("aux"); - svc.run (); -} - -static void runIO (boost::asio::io_service& io) -{ - setCallingThreadName ("io"); - io.run (); -} - -// VFALCO TODO Break this function up into many small initialization segments. -// Or better yet refactor these initializations into RAII classes -// which are members of the Application object. -// -void ApplicationImp::setup () -{ - // VFALCO NOTE: 0 means use heuristics to determine the thread count. - mJobQueue.setThreadCount (0, getConfig ().RUN_STANDALONE); - - mSweepTimer.expires_from_now (boost::posix_time::seconds (10)); - mSweepTimer.async_wait (BIND_TYPE (&ApplicationImp::sweep, this)); - - m_loadManager->startThread (); - -#if ! BEAST_WIN32 -#ifdef SIGINT - - if (!getConfig ().RUN_STANDALONE) - { - struct sigaction sa; - memset (&sa, 0, sizeof (sa)); - sa.sa_handler = sigIntHandler; - sigaction (SIGINT, &sa, NULL); - } - -#endif -#endif - - assert (mTxnDB == NULL); - - if (!getConfig ().DEBUG_LOGFILE.empty ()) - { - // Let debug messages go to the file but only WARNING or higher to regular output (unless verbose) - Log::setLogFile (getConfig ().DEBUG_LOGFILE); - - if (Log::getMinSeverity () > lsDEBUG) - LogPartition::setSeverity (lsDEBUG); - } - - if (!getConfig ().RUN_STANDALONE) - mSNTPClient.init (getConfig ().SNTP_SERVERS); - - // - // Construct databases. - // - boost::thread t1 (BIND_TYPE (&InitDB, &mRpcDB, "rpc.db", RpcDBInit, RpcDBCount)); - boost::thread t2 (BIND_TYPE (&InitDB, &mTxnDB, "transaction.db", TxnDBInit, TxnDBCount)); - boost::thread t3 (BIND_TYPE (&InitDB, &mLedgerDB, "ledger.db", LedgerDBInit, LedgerDBCount)); - boost::thread t4 (BIND_TYPE (&InitDB, &mWalletDB, "wallet.db", WalletDBInit, WalletDBCount)); - t1.join (); - t2.join (); - t3.join (); - t4.join (); - - leveldb::Options options; - options.create_if_missing = true; - options.block_cache = leveldb::NewLRUCache (getConfig ().getSize (siHashNodeDBCache) * 1024 * 1024); - - getApp().getLedgerDB ()->getDB ()->executeSQL (boost::str (boost::format ("PRAGMA cache_size=-%d;") % - (getConfig ().getSize (siLgrDBCache) * 1024))); - getApp().getTxnDB ()->getDB ()->executeSQL (boost::str (boost::format ("PRAGMA cache_size=-%d;") % - (getConfig ().getSize (siTxnDBCache) * 1024))); - - mTxnDB->getDB ()->setupCheckpointing (&mJobQueue); - mLedgerDB->getDB ()->setupCheckpointing (&mJobQueue); - - if (!getConfig ().RUN_STANDALONE) - updateTables (); - - mFeatures->addInitialFeatures (); - - if (getConfig ().START_UP == Config::FRESH) - { - WriteLog (lsINFO, Application) << "Starting new Ledger"; - - startNewLedger (); - } - else if ((getConfig ().START_UP == Config::LOAD) || (getConfig ().START_UP == Config::REPLAY)) - { - WriteLog (lsINFO, Application) << "Loading specified Ledger"; - - if (!loadOldLedger (getConfig ().START_LEDGER, getConfig ().START_UP == Config::REPLAY)) - { - getApp().stop (); - exit (-1); - } - } - else if (getConfig ().START_UP == Config::NETWORK) - { - // This should probably become the default once we have a stable network - if (!getConfig ().RUN_STANDALONE) - mNetOps->needNetworkLedger (); - - startNewLedger (); - } - else - startNewLedger (); - - mOrderBookDB.setup (getApp().getLedgerMaster ().getCurrentLedger ()); - - // - // Begin validation and ip maintenance. - // - LocalCredentials maintains local information: including identity and network connection persistence information. - // - m_localCredentials.start (); - - // - // Set up UNL. - // - if (!getConfig ().RUN_STANDALONE) - getUNL ().nodeBootstrap (); - - mValidations->tune (getConfig ().getSize (siValidationsSize), getConfig ().getSize (siValidationsAge)); - m_nodeStore->tune (getConfig ().getSize (siNodeCacheSize), getConfig ().getSize (siNodeCacheAge)); - mLedgerMaster.tune (getConfig ().getSize (siLedgerSize), getConfig ().getSize (siLedgerAge)); - mSLECache.setTargetSize (getConfig ().getSize (siSLECacheSize)); - mSLECache.setTargetAge (getConfig ().getSize (siSLECacheAge)); - - mLedgerMaster.setMinValidations (getConfig ().VALIDATION_QUORUM); - - // - // Allow peer connections. - // - if (!getConfig ().RUN_STANDALONE) - { - try - { - mPeerDoor = PeerDoor::New ( - getConfig ().PEER_IP, - getConfig ().PEER_PORT, - getConfig ().PEER_SSL_CIPHER_LIST, - m_mainService); - } - catch (const std::exception& e) - { - // Must run as directed or exit. - WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open peer service: %s") % e.what ()); - - exit (3); - } - } - else - { - WriteLog (lsINFO, Application) << "Peer interface: disabled"; - } - - // - // Allow RPC connections. - // - if (! getConfig ().getRpcIP().empty () && getConfig ().getRpcPort() != 0) - { - try - { - mRPCDoor = new RPCDoor (m_mainService, m_rpcServerHandler); - } - catch (const std::exception& e) - { - // Must run as directed or exit. - WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open RPC service: %s") % e.what ()); - - exit (3); - } - } - else - { - WriteLog (lsINFO, Application) << "RPC interface: disabled"; - } - - // - // Allow private WS connections. - // - if (!getConfig ().WEBSOCKET_IP.empty () && getConfig ().WEBSOCKET_PORT) - { - try - { - mWSPrivateDoor = WSDoor::createWSDoor (getConfig ().WEBSOCKET_IP, getConfig ().WEBSOCKET_PORT, false); - } - catch (const std::exception& e) - { - // Must run as directed or exit. - WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open private websocket service: %s") % e.what ()); - - exit (3); - } - } - else - { - WriteLog (lsINFO, Application) << "WS private interface: disabled"; - } - - // - // Allow public WS connections. - // - if (!getConfig ().WEBSOCKET_PUBLIC_IP.empty () && getConfig ().WEBSOCKET_PUBLIC_PORT) - { - try - { - mWSPublicDoor = WSDoor::createWSDoor (getConfig ().WEBSOCKET_PUBLIC_IP, getConfig ().WEBSOCKET_PUBLIC_PORT, true); - } - catch (const std::exception& e) - { - // Must run as directed or exit. - WriteLog (lsFATAL, Application) << boost::str (boost::format ("Can not open public websocket service: %s") % e.what ()); - - exit (3); - } - } - else - { - WriteLog (lsINFO, Application) << "WS public interface: disabled"; - } - - // - // Begin connecting to network. - // - if (!getConfig ().RUN_STANDALONE) - mPeers->start (); - - if (getConfig ().RUN_STANDALONE) - { - WriteLog (lsWARNING, Application) << "Running in standalone mode"; - - mNetOps->setStandAlone (); - } - else - { - // VFALCO NOTE the state timer resets the deadlock detector. - // - mNetOps->setStateTimer (); - } -} - void ApplicationImp::run () { + // VFALCO TODO The unit tests crash if we try to + // run these threads in the IoService constructor + // so this hack makes them start later. + // + m_mainService.runExtraThreads (); + if (!getConfig ().RUN_STANDALONE) { // VFALCO NOTE This seems unnecessary. If we properly refactor the load diff --git a/modules/ripple_app/network/WSDoor.cpp b/modules/ripple_app/network/WSDoor.cpp index 968e1cdbaa..7f1ebf1347 100644 --- a/modules/ripple_app/network/WSDoor.cpp +++ b/modules/ripple_app/network/WSDoor.cpp @@ -28,6 +28,7 @@ WSDoor::WSDoor (std::string const& strIp, int iPort, bool bPublic) , mIp (strIp) , mPort (iPort) { + startThread (); } WSDoor::~WSDoor () @@ -39,8 +40,8 @@ WSDoor::~WSDoor () m_endpoint->stop (); } - m_thread.signalThreadShouldExit (); - m_thread.waitForThreadToExit (); + signalThreadShouldExit (); + waitForThreadToExit (); } void WSDoor::run () @@ -110,6 +111,6 @@ void WSDoor::stop () m_endpoint->stop (); } - m_thread.signalThreadShouldExit (); - m_thread.waitForThreadToExit (); + signalThreadShouldExit (); + waitForThreadToExit (); } diff --git a/modules/ripple_app/network/WSDoor.h b/modules/ripple_app/network/WSDoor.h index 567a9cea22..39f64d0d37 100644 --- a/modules/ripple_app/network/WSDoor.h +++ b/modules/ripple_app/network/WSDoor.h @@ -20,7 +20,7 @@ private: void run (); private: - ScopedPointer m_endpoint; + ScopedPointer m_endpoint; CriticalSection m_endpointLock; bool mPublic; std::string mIp; diff --git a/modules/ripple_app/ripple_app.cpp b/modules/ripple_app/ripple_app.cpp index 11ff12915d..f8c11546e2 100644 --- a/modules/ripple_app/ripple_app.cpp +++ b/modules/ripple_app/ripple_app.cpp @@ -60,8 +60,6 @@ #include #include -//#include "../ripple_sqlite/ripple_sqlite.h" // for SqliteDatabase.cpp - #include "../ripple_core/ripple_core.h" #include "beast/modules/beast_db/beast_db.h" diff --git a/modules/ripple_core/functional/ripple_JobQueue.cpp b/modules/ripple_core/functional/ripple_JobQueue.cpp index db80cacc70..c038ae521b 100644 --- a/modules/ripple_core/functional/ripple_JobQueue.cpp +++ b/modules/ripple_core/functional/ripple_JobQueue.cpp @@ -195,7 +195,7 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode) } else if (c == 0) { - c = boost::thread::hardware_concurrency (); + c = SystemStats::getNumCpus (); // VFALCO NOTE According to boost, hardware_concurrency cannot return // negative numbers/ diff --git a/rippled.1 b/rippled.1 deleted file mode 100644 index 8617000b89..0000000000 Binary files a/rippled.1 and /dev/null differ