Use beast::Thread for io_services

Conflicts:
	TODO.txt
This commit is contained in:
Vinnie Falco
2013-07-28 14:58:03 -07:00
parent f5b14b4ef5
commit 7110aacabc
2 changed files with 132 additions and 27 deletions

View File

@@ -43,6 +43,16 @@ David Feature:
- Add "skipped" field to beginTestCase() to disable a test but still record - Add "skipped" field to beginTestCase() to disable a test but still record
that it was skipped in the output. Like for mdb import. that it was skipped in the output. Like for mdb import.
- use beast DeadlineTimer for sweep in Application
- Make SNTP Client have its own io_service
- Get rid of 'ref' typedefs that really mean const&
- Use secp256k1 from beast
- Fix xsd/dtd line in JUnit XML output
- Get rid of the WriteLog() stuff in the ripple tests and make it report the - Get rid of the WriteLog() stuff in the ripple tests and make it report the
message directly to the UnitTest object. Then update the JUnit XML output message directly to the UnitTest object. Then update the JUnit XML output
routines to also write the auxiliary messages. routines to also write the auxiliary messages.

View File

@@ -20,6 +20,106 @@ class ApplicationImp
, LeakChecked <ApplicationImp> , LeakChecked <ApplicationImp>
{ {
public: public:
// RAII container for a boost::asio::io_service run by beast threads
class IoServiceThread
{
public:
IoServiceThread (String const& name,
int expectedConcurrency,
int numberOfExtraThreads = 0)
: m_name (name)
, m_service (expectedConcurrency)
, m_work (m_service)
{
m_threads.ensureStorageAllocated (numberOfExtraThreads);
for (int i = 0; i < numberOfExtraThreads; ++i)
m_threads.add (new ServiceThread (m_name, m_service));
}
~IoServiceThread ()
{
m_service.stop ();
// the dtor of m_threads will block until each thread exits.
}
// Run on the callers thread.
// This will block until stop is issued.
void run ()
{
Thread const* const currentThread (Thread::getCurrentThread());
String previousThreadName;
if (currentThread != nullptr)
{
previousThreadName = currentThread->getThreadName ();
}
else
{
// we're on the main thread
previousThreadName = "main"; // for vanity
}
Thread::setCurrentThreadName (m_name);
m_service.run ();
Thread::setCurrentThreadName (previousThreadName);
}
void stop ()
{
m_service.stop ();
}
boost::asio::io_service& getService ()
{
return m_service;
}
operator boost::asio::io_service& ()
{
return m_service;
}
private:
class ServiceThread : Thread
{
public:
explicit ServiceThread (String const& name, boost::asio::io_service& service)
: Thread (name)
, m_service (service)
{
startThread ();
}
~ServiceThread ()
{
m_service.stop ();
stopThread (-1); // wait forever
}
void run ()
{
m_service.run ();
}
private:
boost::asio::io_service& m_service;
};
private:
String const m_name;
boost::asio::io_service m_service;
boost::asio::io_service::work m_work;
OwnedArray <ServiceThread> m_threads;
};
//--------------------------------------------------------------------------
static ApplicationImp* createInstance () static ApplicationImp* createInstance ()
{ {
return new ApplicationImp; return new ApplicationImp;
@@ -40,14 +140,16 @@ public:
// //
: SharedSingleton <Application> (SingletonLifetime::neverDestroyed) : SharedSingleton <Application> (SingletonLifetime::neverDestroyed)
#endif #endif
, mIOService ((getConfig ().NODE_SIZE >= 2) ? 2 : 1) , m_mainService ("io",
, mIOWork (mIOService) (getConfig ().NODE_SIZE >= 2) ? 2 : 1,
(getConfig ().NODE_SIZE >= 2) ? 1 : 0)
, m_auxService ("auxio", 1, 1)
, mNetOps (new NetworkOPs (&mLedgerMaster)) , mNetOps (new NetworkOPs (&mLedgerMaster))
, m_rpcServerHandler (*mNetOps) , m_rpcServerHandler (*mNetOps)
, mTempNodeCache ("NodeCache", 16384, 90) , mTempNodeCache ("NodeCache", 16384, 90)
, mSLECache ("LedgerEntryCache", 4096, 120) , mSLECache ("LedgerEntryCache", 4096, 120)
, mSNTPClient (mAuxService) , mSNTPClient (m_auxService)
, mJobQueue (mIOService) , mJobQueue (m_mainService)
// VFALCO New stuff // VFALCO New stuff
, m_nodeStore (NodeStore::New ( , m_nodeStore (NodeStore::New (
getConfig ().nodeDatabase, getConfig ().nodeDatabase,
@@ -61,7 +163,7 @@ public:
, mValidations (IValidations::New ()) , mValidations (IValidations::New ())
, mUNL (UniqueNodeList::New ()) , mUNL (UniqueNodeList::New ())
, mProofOfWorkFactory (IProofOfWorkFactory::New ()) , mProofOfWorkFactory (IProofOfWorkFactory::New ())
, mPeers (IPeers::New (mIOService)) , mPeers (IPeers::New (m_mainService))
, m_loadManager (ILoadManager::New ()) , m_loadManager (ILoadManager::New ())
// VFALCO End new stuff // VFALCO End new stuff
// VFALCO TODO replace all NULL with nullptr // VFALCO TODO replace all NULL with nullptr
@@ -73,7 +175,7 @@ public:
, mRPCDoor (NULL) , mRPCDoor (NULL)
, mWSPublicDoor (NULL) , mWSPublicDoor (NULL)
, mWSPrivateDoor (NULL) , mWSPrivateDoor (NULL)
, mSweepTimer (mAuxService) , mSweepTimer (m_auxService)
, mShutdown (false) , mShutdown (false)
{ {
// VFALCO TODO remove these once the call is thread safe. // VFALCO TODO remove these once the call is thread safe.
@@ -119,7 +221,7 @@ public:
boost::asio::io_service& getIOService () boost::asio::io_service& getIOService ()
{ {
return mIOService; return m_mainService;
} }
LedgerMaster& getLedgerMaster () LedgerMaster& getLedgerMaster ()
@@ -270,13 +372,12 @@ private:
bool loadOldLedger (const std::string&, bool); bool loadOldLedger (const std::string&, bool);
private: private:
boost::asio::io_service mIOService; IoServiceThread m_mainService;
boost::asio::io_service mAuxService; IoServiceThread m_auxService;
// The lifetime of the io_service::work object informs the io_service
// of when the work starts and finishes. io_service::run() will not exit //boost::asio::io_service mIOService;
// while the work object exists. //boost::asio::io_service mAuxService;
// //boost::asio::io_service::work mIOWork;
boost::asio::io_service::work mIOWork;
MasterLockType mMasterLock; MasterLockType mMasterLock;
@@ -330,13 +431,14 @@ void ApplicationImp::stop ()
WriteLog (lsINFO, Application) << "Received shutdown request"; WriteLog (lsINFO, Application) << "Received shutdown request";
StopSustain (); StopSustain ();
mShutdown = true; mShutdown = true;
mIOService.stop (); m_mainService.stop ();
m_nodeStore = nullptr; m_nodeStore = nullptr;
mValidations->flush (); mValidations->flush ();
mAuxService.stop (); m_auxService.stop ();
mJobQueue.shutdown (); mJobQueue.shutdown ();
WriteLog (lsINFO, Application) << "Stopped: " << mIOService.stopped (); //WriteLog (lsINFO, Application) << "Stopped: " << mIOService.stopped
mShutdown = false; mShutdown = false;
} }
@@ -404,8 +506,6 @@ void ApplicationImp::setup ()
LogPartition::setSeverity (lsDEBUG); LogPartition::setSeverity (lsDEBUG);
} }
boost::thread (BIND_TYPE (runAux, boost::ref (mAuxService))).detach ();
if (!getConfig ().RUN_STANDALONE) if (!getConfig ().RUN_STANDALONE)
mSNTPClient.init (getConfig ().SNTP_SERVERS); mSNTPClient.init (getConfig ().SNTP_SERVERS);
@@ -498,7 +598,7 @@ void ApplicationImp::setup ()
getConfig ().PEER_IP, getConfig ().PEER_IP,
getConfig ().PEER_PORT, getConfig ().PEER_PORT,
getConfig ().PEER_SSL_CIPHER_LIST, getConfig ().PEER_SSL_CIPHER_LIST,
mIOService); m_mainService);
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {
@@ -520,7 +620,7 @@ void ApplicationImp::setup ()
{ {
try try
{ {
mRPCDoor = new RPCDoor (mIOService, m_rpcServerHandler); mRPCDoor = new RPCDoor (m_mainService, m_rpcServerHandler);
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {
@@ -601,11 +701,6 @@ void ApplicationImp::setup ()
void ApplicationImp::run () void ApplicationImp::run ()
{ {
if (getConfig ().NODE_SIZE >= 2)
{
boost::thread (BIND_TYPE (runIO, boost::ref (mIOService))).detach ();
}
if (!getConfig ().RUN_STANDALONE) if (!getConfig ().RUN_STANDALONE)
{ {
// VFALCO NOTE This seems unnecessary. If we properly refactor the load // VFALCO NOTE This seems unnecessary. If we properly refactor the load
@@ -614,7 +709,7 @@ void ApplicationImp::run ()
getApp().getLoadManager ().activateDeadlockDetector (); getApp().getLoadManager ().activateDeadlockDetector ();
} }
mIOService.run (); // This blocks m_mainService.run (); // This blocks until the io_service is stopped.
if (mWSPublicDoor) if (mWSPublicDoor)
mWSPublicDoor->stop (); mWSPublicDoor->stop ();