From 7110aacabcbd429d71d1de51fe1677034a54c1a9 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sun, 28 Jul 2013 14:58:03 -0700 Subject: [PATCH] Use beast::Thread for io_services Conflicts: TODO.txt --- TODO.txt | 10 ++ .../ripple_app/main/ripple_Application.cpp | 149 ++++++++++++++---- 2 files changed, 132 insertions(+), 27 deletions(-) diff --git a/TODO.txt b/TODO.txt index 99d3f92862..ec7eeb553d 100644 --- a/TODO.txt +++ b/TODO.txt @@ -43,6 +43,16 @@ David Feature: - Add "skipped" field to beginTestCase() to disable a test but still record that it was skipped in the output. Like for mdb import. +- use beast DeadlineTimer for sweep in Application + +- Make SNTP Client have its own io_service + +- Get rid of 'ref' typedefs that really mean const& + +- Use secp256k1 from beast + +- Fix xsd/dtd line in JUnit XML output + - Get rid of the WriteLog() stuff in the ripple tests and make it report the message directly to the UnitTest object. Then update the JUnit XML output routines to also write the auxiliary messages. diff --git a/modules/ripple_app/main/ripple_Application.cpp b/modules/ripple_app/main/ripple_Application.cpp index abfd68c3fc..239cb67176 100644 --- a/modules/ripple_app/main/ripple_Application.cpp +++ b/modules/ripple_app/main/ripple_Application.cpp @@ -20,6 +20,106 @@ class ApplicationImp , LeakChecked { public: + // RAII container for a boost::asio::io_service run by beast threads + class IoServiceThread + { + public: + IoServiceThread (String const& name, + int expectedConcurrency, + int numberOfExtraThreads = 0) + : m_name (name) + , m_service (expectedConcurrency) + , m_work (m_service) + { + m_threads.ensureStorageAllocated (numberOfExtraThreads); + + for (int i = 0; i < numberOfExtraThreads; ++i) + m_threads.add (new ServiceThread (m_name, m_service)); + } + + ~IoServiceThread () + { + m_service.stop (); + + // the dtor of m_threads will block until each thread exits. + } + + // Run on the callers thread. + // This will block until stop is issued. + void run () + { + Thread const* const currentThread (Thread::getCurrentThread()); + + String previousThreadName; + + if (currentThread != nullptr) + { + previousThreadName = currentThread->getThreadName (); + } + else + { + // we're on the main thread + previousThreadName = "main"; // for vanity + } + + Thread::setCurrentThreadName (m_name); + + m_service.run (); + + Thread::setCurrentThreadName (previousThreadName); + } + + void stop () + { + m_service.stop (); + } + + boost::asio::io_service& getService () + { + return m_service; + } + + operator boost::asio::io_service& () + { + return m_service; + } + + private: + class ServiceThread : Thread + { + public: + explicit ServiceThread (String const& name, boost::asio::io_service& service) + : Thread (name) + , m_service (service) + { + startThread (); + } + + ~ServiceThread () + { + m_service.stop (); + + stopThread (-1); // wait forever + } + + void run () + { + m_service.run (); + } + + private: + boost::asio::io_service& m_service; + }; + + private: + String const m_name; + boost::asio::io_service m_service; + boost::asio::io_service::work m_work; + OwnedArray m_threads; + }; + + //-------------------------------------------------------------------------- + static ApplicationImp* createInstance () { return new ApplicationImp; @@ -40,14 +140,16 @@ public: // : SharedSingleton (SingletonLifetime::neverDestroyed) #endif - , mIOService ((getConfig ().NODE_SIZE >= 2) ? 2 : 1) - , mIOWork (mIOService) + , m_mainService ("io", + (getConfig ().NODE_SIZE >= 2) ? 2 : 1, + (getConfig ().NODE_SIZE >= 2) ? 1 : 0) + , m_auxService ("auxio", 1, 1) , mNetOps (new NetworkOPs (&mLedgerMaster)) , m_rpcServerHandler (*mNetOps) , mTempNodeCache ("NodeCache", 16384, 90) , mSLECache ("LedgerEntryCache", 4096, 120) - , mSNTPClient (mAuxService) - , mJobQueue (mIOService) + , mSNTPClient (m_auxService) + , mJobQueue (m_mainService) // VFALCO New stuff , m_nodeStore (NodeStore::New ( getConfig ().nodeDatabase, @@ -61,7 +163,7 @@ public: , mValidations (IValidations::New ()) , mUNL (UniqueNodeList::New ()) , mProofOfWorkFactory (IProofOfWorkFactory::New ()) - , mPeers (IPeers::New (mIOService)) + , mPeers (IPeers::New (m_mainService)) , m_loadManager (ILoadManager::New ()) // VFALCO End new stuff // VFALCO TODO replace all NULL with nullptr @@ -73,7 +175,7 @@ public: , mRPCDoor (NULL) , mWSPublicDoor (NULL) , mWSPrivateDoor (NULL) - , mSweepTimer (mAuxService) + , mSweepTimer (m_auxService) , mShutdown (false) { // VFALCO TODO remove these once the call is thread safe. @@ -119,7 +221,7 @@ public: boost::asio::io_service& getIOService () { - return mIOService; + return m_mainService; } LedgerMaster& getLedgerMaster () @@ -270,13 +372,12 @@ private: bool loadOldLedger (const std::string&, bool); private: - boost::asio::io_service mIOService; - boost::asio::io_service mAuxService; - // The lifetime of the io_service::work object informs the io_service - // of when the work starts and finishes. io_service::run() will not exit - // while the work object exists. - // - boost::asio::io_service::work mIOWork; + IoServiceThread m_mainService; + IoServiceThread m_auxService; + + //boost::asio::io_service mIOService; + //boost::asio::io_service mAuxService; + //boost::asio::io_service::work mIOWork; MasterLockType mMasterLock; @@ -330,13 +431,14 @@ void ApplicationImp::stop () WriteLog (lsINFO, Application) << "Received shutdown request"; StopSustain (); mShutdown = true; - mIOService.stop (); + m_mainService.stop (); m_nodeStore = nullptr; mValidations->flush (); - mAuxService.stop (); + m_auxService.stop (); mJobQueue.shutdown (); - WriteLog (lsINFO, Application) << "Stopped: " << mIOService.stopped (); + //WriteLog (lsINFO, Application) << "Stopped: " << mIOService.stopped + mShutdown = false; } @@ -404,8 +506,6 @@ void ApplicationImp::setup () LogPartition::setSeverity (lsDEBUG); } - boost::thread (BIND_TYPE (runAux, boost::ref (mAuxService))).detach (); - if (!getConfig ().RUN_STANDALONE) mSNTPClient.init (getConfig ().SNTP_SERVERS); @@ -498,7 +598,7 @@ void ApplicationImp::setup () getConfig ().PEER_IP, getConfig ().PEER_PORT, getConfig ().PEER_SSL_CIPHER_LIST, - mIOService); + m_mainService); } catch (const std::exception& e) { @@ -520,7 +620,7 @@ void ApplicationImp::setup () { try { - mRPCDoor = new RPCDoor (mIOService, m_rpcServerHandler); + mRPCDoor = new RPCDoor (m_mainService, m_rpcServerHandler); } catch (const std::exception& e) { @@ -601,11 +701,6 @@ void ApplicationImp::setup () void ApplicationImp::run () { - if (getConfig ().NODE_SIZE >= 2) - { - boost::thread (BIND_TYPE (runIO, boost::ref (mIOService))).detach (); - } - if (!getConfig ().RUN_STANDALONE) { // VFALCO NOTE This seems unnecessary. If we properly refactor the load @@ -614,7 +709,7 @@ void ApplicationImp::run () getApp().getLoadManager ().activateDeadlockDetector (); } - mIOService.run (); // This blocks + m_mainService.run (); // This blocks until the io_service is stopped. if (mWSPublicDoor) mWSPublicDoor->stop ();