Refactor Application shutdown using new Service, AsyncService interfaces

This commit is contained in:
Vinnie Falco
2013-09-17 17:32:54 -07:00
parent 97e961a048
commit 89b1859929
57 changed files with 2690 additions and 1602 deletions

View File

@@ -210,6 +210,12 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple_app\main\IoServicePool.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple_app\main\ParameterTable.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
@@ -740,6 +746,12 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple_basics\utility\Service.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple_basics\utility\StringUtilities.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
@@ -985,6 +997,12 @@
<ClCompile Include="..\..\src\ripple_hyperleveldb\ripple_hyperleveldb.cpp" />
<ClCompile Include="..\..\src\ripple_leveldb\ripple_leveldb.cpp" />
<ClCompile Include="..\..\src\ripple_mdb\ripple_mdb.c" />
<ClCompile Include="..\..\src\ripple_net\basics\AsyncService.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple_net\basics\RippleSSLContext.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
@@ -1459,6 +1477,7 @@
<ClInclude Include="..\..\src\ripple_app\ledger\LedgerEntrySet.h" />
<ClInclude Include="..\..\src\ripple_app\ledger\LedgerHistory.h" />
<ClInclude Include="..\..\src\ripple_app\ledger\SerializedValidation.h" />
<ClInclude Include="..\..\src\ripple_app\main\IoServicePool.h" />
<ClInclude Include="..\..\src\ripple_app\main\ParameterTable.h" />
<ClInclude Include="..\..\src\ripple_app\main\Application.h" />
<ClInclude Include="..\..\src\ripple_app\main\FatalErrorReporter.h" />
@@ -1558,6 +1577,7 @@
<ClInclude Include="..\..\src\ripple_basics\utility\IniFile.h" />
<ClInclude Include="..\..\src\ripple_basics\utility\PlatformMacros.h" />
<ClInclude Include="..\..\src\ripple_basics\utility\RandomNumbers.h" />
<ClInclude Include="..\..\src\ripple_basics\utility\Service.h" />
<ClInclude Include="..\..\src\ripple_basics\utility\StringUtilities.h" />
<ClInclude Include="..\..\src\ripple_basics\utility\Sustain.h" />
<ClInclude Include="..\..\src\ripple_basics\utility\ThreadName.h" />
@@ -1610,6 +1630,7 @@
<ClInclude Include="..\..\src\ripple_hyperleveldb\ripple_hyperleveldb.h" />
<ClInclude Include="..\..\src\ripple_leveldb\ripple_leveldb.h" />
<ClInclude Include="..\..\src\ripple_mdb\ripple_mdb.h" />
<ClInclude Include="..\..\src\ripple_net\basics\AsyncService.h" />
<ClInclude Include="..\..\src\ripple_net\basics\impl\MultiSocketType.h" />
<ClInclude Include="..\..\src\ripple_net\basics\impl\RPCServerImp.h" />
<ClInclude Include="..\..\src\ripple_net\basics\RippleSSLContext.h" />

View File

@@ -19,9 +19,6 @@
<Filter Include="[1] Ripple\ripple_leveldb">
<UniqueIdentifier>{487f6b35-d0a3-4b34-85c1-94e2aad4c9ff}</UniqueIdentifier>
</Filter>
<Filter Include="[1] Ripple\ripple_websocket">
<UniqueIdentifier>{6ad1f6a2-1710-43ac-96d4-f6b54fd8379e}</UniqueIdentifier>
</Filter>
<Filter Include="[1] Ripple\ripple_basics\containers">
<UniqueIdentifier>{2bf8c9ef-13f7-477a-8a78-2a52a26bb4f8}</UniqueIdentifier>
</Filter>
@@ -46,9 +43,6 @@
<Filter Include="[1] Ripple\ripple_data\utility">
<UniqueIdentifier>{a837f3ce-75b7-4e6c-b8b1-728b6a1216bd}</UniqueIdentifier>
</Filter>
<Filter Include="[1] Ripple\ripple_websocket\autosocket">
<UniqueIdentifier>{99ac4d07-04a7-4ce3-96c7-b8ea578f1a61}</UniqueIdentifier>
</Filter>
<Filter Include="[1] Ripple\ripple_net">
<UniqueIdentifier>{c84fc3af-f487-4eba-af78-d4be009f76d1}</UniqueIdentifier>
</Filter>
@@ -175,6 +169,12 @@
<Filter Include="[2] Ripple %28New%29\beast">
<UniqueIdentifier>{458b9099-fcf6-49fe-b3fb-a27beb2ee070}</UniqueIdentifier>
</Filter>
<Filter Include="[1] Ripple\ripple_websocket">
<UniqueIdentifier>{6ad1f6a2-1710-43ac-96d4-f6b54fd8379e}</UniqueIdentifier>
</Filter>
<Filter Include="[1] Ripple\ripple_websocket\autosocket">
<UniqueIdentifier>{99ac4d07-04a7-4ce3-96c7-b8ea578f1a61}</UniqueIdentifier>
</Filter>
</ItemGroup>
<ItemGroup>
<ClCompile Include="..\..\src\ripple_basics\containers\RangeSet.cpp">
@@ -909,6 +909,15 @@
<ClCompile Include="..\..\src\ripple\beast\ripple_beastc.c">
<Filter>[2] Ripple %28New%29\beast</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple_basics\utility\Service.cpp">
<Filter>[1] Ripple\ripple_basics\utility</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple_app\main\IoServicePool.cpp">
<Filter>[1] Ripple\ripple_app\main</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple_net\basics\AsyncService.cpp">
<Filter>[1] Ripple\ripple_net\basics</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\src\ripple_basics\containers\KeyCache.h">
@@ -1803,6 +1812,15 @@
<ClInclude Include="..\..\src\ripple_hyperleveldb\ripple_hyperleveldb.h">
<Filter>[1] Ripple\ripple_hyperleveldb</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple_basics\utility\Service.h">
<Filter>[1] Ripple\ripple_basics\utility</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple_app\main\IoServicePool.h">
<Filter>[1] Ripple\ripple_app\main</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple_net\basics\AsyncService.h">
<Filter>[1] Ripple\ripple_net\basics</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<CustomBuild Include="..\..\src\ripple_data\protocol\ripple.proto">

View File

@@ -22,7 +22,7 @@ class Manager : public Uncopyable
public:
/** Create a new Manager object.
*/
static Manager* New (Journal journal);
static Manager* New (Service& parent, Journal journal);
/** Destroy the object.

View File

@@ -93,13 +93,15 @@ namespace Validators
class ManagerImp
: public Manager
, private ThreadWithCallQueue::EntryPoints
, private DeadlineTimer::Listener
, private LeakChecked <ManagerImp>
, public Service
, public ThreadWithCallQueue::EntryPoints
, public DeadlineTimer::Listener
, public LeakChecked <ManagerImp>
{
public:
explicit ManagerImp (Journal journal)
: m_store (journal)
ManagerImp (Service& parent, Journal journal)
: Service ("Validators::Manager", parent)
, m_store (journal)
, m_logic (m_store, journal)
, m_journal (journal)
, m_thread ("Validators")
@@ -111,8 +113,21 @@ public:
~ManagerImp ()
{
m_thread.stop (true);
}
//--------------------------------------------------------------------------
//
// Service
//
void onServiceStop ()
{
m_thread.stop (false);
}
//--------------------------------------------------------------------------
void addStrings (String name, std::vector <std::string> const& strings)
{
StringArray stringArray;
@@ -160,7 +175,7 @@ public:
void receiveValidation (ReceivedValidation const& rv)
{
#if ! RIPPLE_VALIDATORS_DISABLE_MANAGER
#if RIPPLE_USE_NEW_VALIDATORS
m_thread.call (&Logic::receiveValidation, &m_logic, rv);
#endif
}
@@ -169,7 +184,7 @@ public:
void onDeadlineTimer (DeadlineTimer& timer)
{
#if ! RIPPLE_VALIDATORS_DISABLE_MANAGER
#if RIPPLE_USE_NEW_VALIDATORS
if (timer == m_checkTimer)
{
m_checkSources = true;
@@ -184,7 +199,7 @@ public:
void threadInit ()
{
#if ! RIPPLE_VALIDATORS_DISABLE_MANAGER
#if RIPPLE_USE_NEW_VALIDATORS
File const file (File::getSpecialLocation (
File::userDocumentsDirectory).getChildFile ("validators.sqlite"));
@@ -211,13 +226,15 @@ public:
void threadExit ()
{
// must come last
serviceStopped ();
}
bool threadIdle ()
{
bool interrupted = false;
#if ! RIPPLE_VALIDATORS_DISABLE_MANAGER
#if RIPPLE_USE_NEW_VALIDATORS
if (m_checkSources)
{
ThreadCancelCallback cancelCallback (m_thread);
@@ -250,9 +267,9 @@ private:
//------------------------------------------------------------------------------
Manager* Manager::New (Journal journal)
Manager* Manager::New (Service& parent, Journal journal)
{
return new ManagerImp (journal);
return new ManagerImp (parent, journal);
}
}

View File

@@ -523,17 +523,8 @@ void LedgerConsensus::stateAccepted ()
endConsensus ();
}
// VFALCO TODO implement shutdown without a naked global
extern volatile bool doShutdown;
void LedgerConsensus::timerEntry ()
{
if (doShutdown)
{
WriteLog (lsFATAL, LedgerConsensus) << "Shutdown requested";
getApp().stop ();
}
if ((mState != lcsFINISHED) && (mState != lcsACCEPTED))
checkLCL ();

View File

@@ -6,8 +6,9 @@
typedef std::pair<uint256, InboundLedger::pointer> u256_acq_pair;
InboundLedgers::InboundLedgers ()
: mLock (this, "InboundLedger", __FILE__, __LINE__)
InboundLedgers::InboundLedgers (Service& parent)
: Service ("InboundLedgers", parent)
, mLock (this, "InboundLedger", __FILE__, __LINE__)
, mRecentFailures ("LedgerAcquireRecentFailures", 0, kReacquireIntervalSeconds)
{
}
@@ -20,6 +21,8 @@ InboundLedger::pointer InboundLedgers::findCreate (uint256 const& hash, uint32 s
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (! isServiceStopping ())
{
boost::unordered_map<uint256, InboundLedger::pointer>::iterator it = mLedgers.find (hash);
if (it != mLedgers.end ())
{
@@ -49,6 +52,7 @@ InboundLedger::pointer InboundLedgers::findCreate (uint256 const& hash, uint32 s
}
}
}
}
return ret;
}
@@ -348,4 +352,12 @@ Json::Value InboundLedgers::getInfo()
return ret;
}
// vim:ts=4
void InboundLedgers::onServiceStop ()
{
ScopedLockType lock (mLock, __FILE__, __LINE__);
mLedgers.clear();
mRecentFailures.clear();
serviceStopped();
}

View File

@@ -13,13 +13,15 @@
*/
// VFALCO TODO Rename to InboundLedgers
// VFALCO TODO Create abstract interface
class InboundLedgers : LeakChecked <InboundLedger>
class InboundLedgers
: public Service
, public LeakChecked <InboundLedger>
{
public:
// How long before we try again to acquire the same ledger
static const int kReacquireIntervalSeconds = 300;
InboundLedgers ();
explicit InboundLedgers (Service& parent);
// VFALCO TODO Should this be called findOrAdd ?
//
@@ -60,6 +62,8 @@ public:
void gotFetchPack (Job&);
void sweep ();
void onServiceStop ();
private:
typedef boost::unordered_map <uint256, InboundLedger::pointer> MapType;

View File

@@ -313,7 +313,7 @@ bool LedgerMaster::getValidatedRange (uint32& minVal, uint32& maxVal)
return true;
}
void LedgerMaster::tryFill (Ledger::pointer ledger)
void LedgerMaster::tryFill (Job& job, Ledger::pointer ledger)
{
uint32 seq = ledger->getLedgerSeq ();
uint256 prevHash = ledger->getParentHash ();
@@ -323,7 +323,7 @@ void LedgerMaster::tryFill (Ledger::pointer ledger)
uint32 minHas = ledger->getLedgerSeq ();
uint32 maxHas = ledger->getLedgerSeq ();
while (seq > 0)
while (! job.shouldCancel() && seq > 0)
{
{
ScopedLockType ml (mLock, __FILE__, __LINE__);
@@ -616,7 +616,7 @@ void LedgerMaster::advanceThread()
{ // Previous ledger is in DB
sl.lock(__FILE__, __LINE__);
mFillInProgress = ledger->getLedgerSeq();
getApp().getJobQueue().addJob(jtADVANCE, "tryFill", BIND_TYPE (&LedgerMaster::tryFill, this, ledger));
getApp().getJobQueue().addJob(jtADVANCE, "tryFill", BIND_TYPE (&LedgerMaster::tryFill, this, P_1, ledger));
sl.unlock();
}
progress = true;
@@ -673,7 +673,7 @@ void LedgerMaster::advanceThread()
{
mPathFindThread = true;
getApp().getJobQueue ().addJob (jtUPDATE_PF, "updatePaths",
BIND_TYPE (&LedgerMaster::updatePaths, this));
BIND_TYPE (&LedgerMaster::updatePaths, this, P_1));
}
}
if (progress)
@@ -824,11 +824,11 @@ uint256 LedgerMaster::getLedgerHash(uint32 desiredSeq, Ledger::ref knownGoodLedg
return hash;
}
void LedgerMaster::updatePaths ()
void LedgerMaster::updatePaths (Job& job)
{
Ledger::pointer lastLedger;
do
while (! job.shouldCancel())
{
bool newOnly = true;
@@ -856,10 +856,8 @@ void LedgerMaster::updatePaths ()
}
// VFALCO TODO Fix this global variable
PathRequest::updateAll (lastLedger, newOnly);
PathRequest::updateAll (lastLedger, newOnly, job.getCancelCallback ());
}
while (1);
}
void LedgerMaster::newPathRequest ()
@@ -871,8 +869,6 @@ void LedgerMaster::newPathRequest ()
{
mPathFindThread = true;
getApp().getJobQueue ().addJob (jtUPDATE_PF, "updatePaths",
BIND_TYPE (&LedgerMaster::updatePaths, this));
BIND_TYPE (&LedgerMaster::updatePaths, this, P_1));
}
}
// vim:ts=4

View File

@@ -14,7 +14,9 @@
// VFALCO TODO Rename to Ledgers
// It sounds like this holds all the ledgers...
//
class LedgerMaster : LeakChecked <LedgerMaster>
class LedgerMaster
: public Service
, public LeakChecked <LedgerMaster>
{
public:
typedef FUNCTION_TYPE <void (Ledger::ref)> callback;
@@ -23,8 +25,9 @@ public:
typedef RippleRecursiveMutex LockType;
typedef LockType::ScopedLockType ScopedLockType;
LedgerMaster ()
: mLock (this, "LedgerMaster", __FILE__, __LINE__)
explicit LedgerMaster (Service& parent)
: Service ("LedgerMaster", parent)
, mLock (this, "LedgerMaster", __FILE__, __LINE__)
, mHeldTransactions (uint256 ())
, mMinValidations (0)
, mLastValidateSeq (0)
@@ -198,9 +201,9 @@ private:
bool isTransactionOnFutureList (Transaction::ref trans);
void getFetchPack (Ledger::ref have);
void tryFill (Ledger::pointer);
void tryFill (Job&, Ledger::pointer);
void advanceThread ();
void updatePaths ();
void updatePaths (Job&);
private:
LockType mLock;

View File

@@ -6,8 +6,9 @@
SETUP_LOG (OrderBookDB)
OrderBookDB::OrderBookDB ()
: mLock (this, "OrderBookDB", __FILE__, __LINE__)
OrderBookDB::OrderBookDB (Service& parent)
: Service ("OrderBookDB", parent)
, mLock (this, "OrderBookDB", __FILE__, __LINE__)
, mSeq (0)
{

View File

@@ -12,14 +12,20 @@
// But, for now it is probably faster to just generate it each time.
//
//------------------------------------------------------------------------------
typedef std::pair<uint160, uint160> currencyIssuer_t;
//------------------------------------------------------------------------------
#ifdef C11X
typedef std::pair<const uint160&, const uint160&> currencyIssuer_ct;
#else
typedef std::pair<uint160, uint160> currencyIssuer_ct; // C++ defect 106
#endif
//------------------------------------------------------------------------------
class BookListeners
{
public:
@@ -37,10 +43,15 @@ private:
boost::unordered_map<uint64, InfoSub::wptr> mListeners;
};
class OrderBookDB : LeakChecked <OrderBookDB>
//------------------------------------------------------------------------------
class OrderBookDB
: public Service
, public LeakChecked <OrderBookDB>
{
public:
OrderBookDB ();
explicit OrderBookDB (Service& parent);
void setup (Ledger::ref ledger);
void update (Ledger::pointer ledger);
void invalidate ();

File diff suppressed because it is too large Load Diff

View File

@@ -4,8 +4,8 @@
*/
//==============================================================================
#ifndef RIPPLE_IAPPLICATION_H
#define RIPPLE_IAPPLICATION_H
#ifndef RIPPLE_APP_APPLICATION_H_INCLUDED
#define RIPPLE_APP_APPLICATION_H_INCLUDED
namespace Validators { class Manager; }
@@ -120,7 +120,6 @@ public:
virtual void setup () = 0;
virtual void run () = 0;
virtual void stop () = 0;
virtual void sweep () = 0;
};
extern Application& getApp ();

View File

@@ -0,0 +1,123 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
class IoServicePool::ServiceThread : private Thread
{
public:
explicit ServiceThread (
String const& name,
IoServicePool& owner,
boost::asio::io_service& service)
: Thread (name)
, m_owner (owner)
, m_service (service)
{
//startThread ();
}
~ServiceThread ()
{
m_service.stop ();
// block until thread exits
stopThread ();
}
void start ()
{
startThread ();
}
void run ()
{
m_service.run ();
m_owner.onThreadExit();
}
private:
IoServicePool& m_owner;
boost::asio::io_service& m_service;
};
//------------------------------------------------------------------------------
IoServicePool::IoServicePool (Service& parent, String const& name, int numberOfThreads)
: Service (name.toStdString().c_str(), parent)
, m_name (name)
, m_service (numberOfThreads)
, m_work (boost::ref (m_service))
, m_running (false)
{
bassert (numberOfThreads > 0);
m_threads.ensureStorageAllocated (numberOfThreads);
for (int i = 0; i < numberOfThreads; ++i)
{
++m_threadsRunning;
m_threads.add (new ServiceThread (m_name, *this, m_service));
}
}
IoServicePool::~IoServicePool ()
{
// must have called runAsync()
bassert (m_running);
// the dtor of m_threads will block until each thread exits.
}
void IoServicePool::runAsync ()
{
// must not call twice
bassert (!m_running);
m_running = true;
for (int i = 0; i < m_threads.size (); ++i)
m_threads [i]->start ();
}
boost::asio::io_service& IoServicePool::getService ()
{
return m_service;
}
IoServicePool::operator boost::asio::io_service& ()
{
return m_service;
}
void IoServicePool::onServiceStop ()
{
// VFALCO NOTE This is a hack! We should gracefully
// cancel all pending I/O, and delete the work
// object using boost::optional, and let run()
// just return naturally.
//
m_service.stop ();
}
void IoServicePool::onServiceChildrenStopped ()
{
}
// Called every time io_service::run() returns and a thread will exit.
//
void IoServicePool::onThreadExit()
{
// service must be stopping for threads to exit.
bassert (isServiceStopping());
// must have at least count 1
bassert (m_threadsRunning.get() > 0);
if (--m_threadsRunning == 0)
{
// last thread just exited
serviceStopped ();
}
}

View File

@@ -0,0 +1,38 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
#ifndef RIPPLE_APP_IOSERVICEPOOL_H_INCLUDED
#define RIPPLE_APP_IOSERVICEPOOL_H_INCLUDED
/** An io_service with an associated group of threads. */
class IoServicePool : public Service
{
public:
IoServicePool (Service& parent, String const& name, int numberOfThreads);
~IoServicePool ();
void runAsync ();
boost::asio::io_service& getService ();
operator boost::asio::io_service& ();
void onServiceStop ();
void onServiceChildrenStopped ();
private:
class ServiceThread;
void onThreadExit();
String m_name;
boost::asio::io_service m_service;
boost::optional <boost::asio::io_service::work> m_work;
OwnedArray <ServiceThread> m_threads;
Atomic <int> m_threadsRunning;
bool m_running;
};
#endif

View File

@@ -426,7 +426,6 @@ int RippleMain::run (int argc, char const* const* argv)
// No arguments. Run server.
ScopedPointer <Application> app (Application::New ());
setupServer ();
setCallingThreadName ("io");
startServer ();
}
else

View File

@@ -4,8 +4,6 @@
*/
//==============================================================================
SETUP_LOG (NetworkOPs)
class NetworkOPsImp
: public NetworkOPs
, public DeadlineTimer::Listener
@@ -22,8 +20,10 @@ public:
public:
// VFALCO TODO Make LedgerMaster a SharedPtr or a reference.
//
NetworkOPsImp (LedgerMaster& ledgerMaster)
: mLock (this, "NetOPs", __FILE__, __LINE__)
NetworkOPsImp (LedgerMaster& ledgerMaster, Service& parent, Journal journal)
: NetworkOPs (parent)
, m_journal (journal)
, mLock (this, "NetOPs", __FILE__, __LINE__)
, mMode (omDISCONNECTED)
, mNeedNetworkLedger (false)
, mProposing (false)
@@ -358,8 +358,17 @@ public:
InfoSub::pointer findRpcSub (const std::string& strUrl);
InfoSub::pointer addRpcSub (const std::string& strUrl, InfoSub::ref rspEntry);
//
//--------------------------------------------------------------------------
//
// Service
void onServiceStop ()
{
m_heartbeatTimer.cancel();
m_clusterTimer.cancel();
serviceStopped ();
}
private:
void setHeartbeatTimer ();
@@ -389,6 +398,8 @@ private:
// XXX Split into more locks.
typedef RippleRecursiveMutex LockType;
typedef LockType::ScopedLockType ScopedLockType;
Journal m_journal;
LockType mLock;
OperatingMode mMode;
@@ -484,7 +495,7 @@ void NetworkOPsImp::processHeartbeatTimer ()
if (mMode != omDISCONNECTED)
{
setMode (omDISCONNECTED);
WriteLog (lsWARNING, NetworkOPs)
m_journal.warning
<< "Node count (" << numPeers << ") "
<< "has fallen below quorum (" << getConfig ().NETWORK_QUORUM << ").";
}
@@ -495,7 +506,7 @@ void NetworkOPsImp::processHeartbeatTimer ()
if (mMode == omDISCONNECTED)
{
setMode (omCONNECTED);
WriteLog (lsINFO, NetworkOPs) << "Node count (" << numPeers << ") is sufficient.";
m_journal.info << "Node count (" << numPeers << ") is sufficient.";
}
// Check if the last validated ledger forces a change between these states
@@ -520,11 +531,11 @@ void NetworkOPsImp::processHeartbeatTimer ()
void NetworkOPsImp::processClusterTimer ()
{
bool synced = (getApp().getLedgerMaster().getValidatedLedgerAge() <= 240);
bool synced = (m_ledgerMaster.getValidatedLedgerAge() <= 240);
ClusterNodeStatus us("", synced ? getApp().getFeeTrack().getLocalFee() : 0, getNetworkTimeNC());
if (!getApp().getUNL().nodeUpdate(getApp().getLocalCredentials().getNodePublic(), us))
{
WriteLog (lsDEBUG, NetworkOPs) << "To soon to send cluster update";
m_journal.debug << "To soon to send cluster update";
return;
}
@@ -615,7 +626,8 @@ void NetworkOPsImp::closeTimeOffset (int offset)
else
mCloseTimeOffset = (mCloseTimeOffset * 3) / 4;
CondLog (mCloseTimeOffset != 0, lsINFO, NetworkOPs) << "Close time offset now " << mCloseTimeOffset;
if (mCloseTimeOffset != 0)
m_journal.info << "Close time offset now " << mCloseTimeOffset;
}
uint32 NetworkOPsImp::getLedgerID (uint256 const& hash)
@@ -678,13 +690,13 @@ void NetworkOPsImp::submitTransaction (Job&, SerializedTransaction::pointer iTra
if (getApp().getHashRouter ().addSuppressionPeer (suppress, 0, flags) && ((flags & SF_RETRY) != 0))
{
WriteLog (lsWARNING, NetworkOPs) << "Redundant transactions submitted";
m_journal.warning << "Redundant transactions submitted";
return;
}
if ((flags & SF_BAD) != 0)
{
WriteLog (lsWARNING, NetworkOPs) << "Submitted transaction cached bad";
m_journal.warning << "Submitted transaction cached bad";
return;
}
@@ -694,7 +706,7 @@ void NetworkOPsImp::submitTransaction (Job&, SerializedTransaction::pointer iTra
{
if (!trans->checkSign ())
{
WriteLog (lsWARNING, NetworkOPs) << "Submitted transaction has bad signature";
m_journal.warning << "Submitted transaction has bad signature";
getApp().getHashRouter ().setFlag (suppress, SF_BAD);
return;
}
@@ -703,7 +715,7 @@ void NetworkOPsImp::submitTransaction (Job&, SerializedTransaction::pointer iTra
}
catch (...)
{
WriteLog (lsWARNING, NetworkOPs) << "Exception checking transaction " << suppress;
m_journal.warning << "Exception checking transaction " << suppress;
return;
}
}
@@ -734,9 +746,9 @@ Transaction::pointer NetworkOPsImp::submitTransactionSync (Transaction::ref tpTr
}
else
{
WriteLog (lsFATAL, NetworkOPs) << "Transaction reconstruction failure";
WriteLog (lsFATAL, NetworkOPs) << tpTransNew->getSTransaction ()->getJson (0);
WriteLog (lsFATAL, NetworkOPs) << tpTrans->getSTransaction ()->getJson (0);
m_journal.fatal << "Transaction reconstruction failure";
m_journal.fatal << tpTransNew->getSTransaction ()->getJson (0);
m_journal.fatal << tpTrans->getSTransaction ()->getJson (0);
// assert (false); "1e-95" as amount can trigger this
@@ -780,7 +792,7 @@ void NetworkOPsImp::runTransactionQueue ()
if (isTerRetry (r))
{
// transaction should be held
WriteLog (lsDEBUG, NetworkOPs) << "QTransaction should be held: " << r;
m_journal.debug << "QTransaction should be held: " << r;
dbtx->setStatus (HELD);
getApp().getMasterTransaction ().canonicalize (&dbtx);
m_ledgerMaster.addHeldTransaction (dbtx);
@@ -788,18 +800,18 @@ void NetworkOPsImp::runTransactionQueue ()
else if (r == tefPAST_SEQ)
{
// duplicate or conflict
WriteLog (lsINFO, NetworkOPs) << "QTransaction is obsolete";
m_journal.info << "QTransaction is obsolete";
dbtx->setStatus (OBSOLETE);
}
else if (r == tesSUCCESS)
{
WriteLog (lsINFO, NetworkOPs) << "QTransaction is now included in open ledger";
m_journal.info << "QTransaction is now included in open ledger";
dbtx->setStatus (INCLUDED);
getApp().getMasterTransaction ().canonicalize (&dbtx);
}
else
{
WriteLog (lsDEBUG, NetworkOPs) << "QStatus other than success " << r;
m_journal.debug << "QStatus other than success " << r;
dbtx->setStatus (INVALID);
}
@@ -809,7 +821,7 @@ void NetworkOPsImp::runTransactionQueue ()
if (getApp().getHashRouter ().swapSet (txn->getID (), peers, SF_RELAYED))
{
WriteLog (lsDEBUG, NetworkOPs) << "relaying";
m_journal.debug << "relaying";
protocol::TMTransaction tx;
Serializer s;
dbtx->getSTransaction ()->add (s);
@@ -821,7 +833,7 @@ void NetworkOPsImp::runTransactionQueue ()
getApp().getPeers ().relayMessageBut (peers, packet);
}
else
WriteLog(lsDEBUG, NetworkOPs) << "recently relayed";
m_journal.debug << "recently relayed";
}
txn->doCallbacks (r);
@@ -852,7 +864,7 @@ Transaction::pointer NetworkOPsImp::processTransaction (Transaction::pointer tra
// signature not checked
if (!trans->checkSign ())
{
WriteLog (lsINFO, NetworkOPs) << "Transaction has bad signature";
m_journal.info << "Transaction has bad signature";
trans->setStatus (INVALID);
trans->setResult (temBAD_SIGNATURE);
getApp().getHashRouter ().setFlag (trans->getID (), SF_BAD);
@@ -879,7 +891,8 @@ Transaction::pointer NetworkOPsImp::processTransaction (Transaction::pointer tra
if (r != tesSUCCESS)
{
std::string token, human;
CondLog (transResultInfo (r, token, human), lsINFO, NetworkOPs) << "TransactionResult: " << token << ": " << human;
if (transResultInfo (r, token, human))
m_journal.info << "TransactionResult: " << token << ": " << human;
}
#endif
@@ -896,7 +909,7 @@ Transaction::pointer NetworkOPsImp::processTransaction (Transaction::pointer tra
if (r == tesSUCCESS)
{
WriteLog (lsINFO, NetworkOPs) << "Transaction is now included in open ledger";
m_journal.info << "Transaction is now included in open ledger";
trans->setStatus (INCLUDED);
// VFALCO NOTE The value of trans can be changed here!!
@@ -905,7 +918,7 @@ Transaction::pointer NetworkOPsImp::processTransaction (Transaction::pointer tra
else if (r == tefPAST_SEQ)
{
// duplicate or conflict
WriteLog (lsINFO, NetworkOPs) << "Transaction is obsolete";
m_journal.info << "Transaction is obsolete";
trans->setStatus (OBSOLETE);
}
else if (isTerRetry (r))
@@ -913,7 +926,7 @@ Transaction::pointer NetworkOPsImp::processTransaction (Transaction::pointer tra
if (!bFailHard)
{
// transaction should be held
WriteLog (lsDEBUG, NetworkOPs) << "Transaction should be held: " << r;
m_journal.debug << "Transaction should be held: " << r;
trans->setStatus (HELD);
getApp().getMasterTransaction ().canonicalize (&trans);
m_ledgerMaster.addHeldTransaction (trans);
@@ -921,7 +934,7 @@ Transaction::pointer NetworkOPsImp::processTransaction (Transaction::pointer tra
}
else
{
WriteLog (lsDEBUG, NetworkOPs) << "Status other than success " << r;
m_journal.debug << "Status other than success " << r;
trans->setStatus (INVALID);
}
@@ -992,21 +1005,21 @@ STVector256 NetworkOPsImp::getDirNodeInfo (
if (sleNode)
{
WriteLog (lsDEBUG, NetworkOPs) << "getDirNodeInfo: node index: " << uNodeIndex.ToString ();
m_journal.debug << "getDirNodeInfo: node index: " << uNodeIndex.ToString ();
WriteLog (lsTRACE, NetworkOPs) << "getDirNodeInfo: first: " << strHex (sleNode->getFieldU64 (sfIndexPrevious));
WriteLog (lsTRACE, NetworkOPs) << "getDirNodeInfo: last: " << strHex (sleNode->getFieldU64 (sfIndexNext));
m_journal.trace << "getDirNodeInfo: first: " << strHex (sleNode->getFieldU64 (sfIndexPrevious));
m_journal.trace << "getDirNodeInfo: last: " << strHex (sleNode->getFieldU64 (sfIndexNext));
uNodePrevious = sleNode->getFieldU64 (sfIndexPrevious);
uNodeNext = sleNode->getFieldU64 (sfIndexNext);
svIndexes = sleNode->getFieldV256 (sfIndexes);
WriteLog (lsTRACE, NetworkOPs) << "getDirNodeInfo: first: " << strHex (uNodePrevious);
WriteLog (lsTRACE, NetworkOPs) << "getDirNodeInfo: last: " << strHex (uNodeNext);
m_journal.trace << "getDirNodeInfo: first: " << strHex (uNodePrevious);
m_journal.trace << "getDirNodeInfo: last: " << strHex (uNodeNext);
}
else
{
WriteLog (lsINFO, NetworkOPs) << "getDirNodeInfo: node index: NOT FOUND: " << uNodeIndex.ToString ();
m_journal.info << "getDirNodeInfo: node index: NOT FOUND: " << uNodeIndex.ToString ();
uNodePrevious = 0;
uNodeNext = 0;
@@ -1176,7 +1189,7 @@ bool NetworkOPsImp::checkLastClosedLedger (const std::vector<Peer::pointer>& pee
// agree? And do we have no better ledger available?
// If so, we are either tracking or full.
WriteLog (lsTRACE, NetworkOPs) << "NetworkOPsImp::checkLastClosedLedger";
m_journal.trace << "NetworkOPsImp::checkLastClosedLedger";
Ledger::pointer ourClosed = m_ledgerMaster.getClosedLedger ();
@@ -1185,8 +1198,8 @@ bool NetworkOPsImp::checkLastClosedLedger (const std::vector<Peer::pointer>& pee
uint256 closedLedger = ourClosed->getHash ();
uint256 prevClosedLedger = ourClosed->getParentHash ();
WriteLog (lsTRACE, NetworkOPs) << "OurClosed: " << closedLedger;
WriteLog (lsTRACE, NetworkOPs) << "PrevClosed: " << prevClosedLedger;
m_journal.trace << "OurClosed: " << closedLedger;
m_journal.trace << "PrevClosed: " << prevClosedLedger;
boost::unordered_map<uint256, ValidationCount> ledgers;
{
@@ -1240,14 +1253,17 @@ bool NetworkOPsImp::checkLastClosedLedger (const std::vector<Peer::pointer>& pee
for (boost::unordered_map<uint256, ValidationCount>::iterator it = ledgers.begin (), end = ledgers.end ();
it != end; ++it)
{
WriteLog (lsDEBUG, NetworkOPs) << "L: " << it->first << " t=" << it->second.trustedValidations <<
m_journal.debug << "L: " << it->first << " t=" << it->second.trustedValidations <<
", n=" << it->second.nodesUsing;
// Temporary logging to make sure tiebreaking isn't broken
if (it->second.trustedValidations > 0)
WriteLog (lsTRACE, NetworkOPs) << " TieBreakTV: " << it->second.highValidation;
m_journal.trace << " TieBreakTV: " << it->second.highValidation;
else
CondLog (it->second.nodesUsing > 0, lsTRACE, NetworkOPs) << " TieBreakNU: " << it->second.highNodeUsing;
{
if (it->second.nodesUsing > 0)
m_journal.trace << " TieBreakNU: " << it->second.highNodeUsing;
}
if (it->second > bestVC)
{
@@ -1260,7 +1276,7 @@ bool NetworkOPsImp::checkLastClosedLedger (const std::vector<Peer::pointer>& pee
if (switchLedgers && (closedLedger == prevClosedLedger))
{
// don't switch to our own previous ledger
WriteLog (lsINFO, NetworkOPs) << "We won't switch to our own previous ledger";
m_journal.info << "We won't switch to our own previous ledger";
networkClosed = ourClosed->getHash ();
switchLedgers = false;
}
@@ -1279,9 +1295,9 @@ bool NetworkOPsImp::checkLastClosedLedger (const std::vector<Peer::pointer>& pee
return false;
}
WriteLog (lsWARNING, NetworkOPs) << "We are not running on the consensus ledger";
WriteLog (lsINFO, NetworkOPs) << "Our LCL: " << ourClosed->getJson (0);
WriteLog (lsINFO, NetworkOPs) << "Net LCL " << closedLedger;
m_journal.warning << "We are not running on the consensus ledger";
m_journal.info << "Our LCL: " << ourClosed->getJson (0);
m_journal.info << "Net LCL " << closedLedger;
if ((mMode == omTRACKING) || (mMode == omFULL))
setMode (omCONNECTED);
@@ -1290,7 +1306,7 @@ bool NetworkOPsImp::checkLastClosedLedger (const std::vector<Peer::pointer>& pee
if (!consensus)
{
WriteLog (lsINFO, NetworkOPs) << "Acquiring consensus ledger " << closedLedger;
m_journal.info << "Acquiring consensus ledger " << closedLedger;
if (!mAcquiringLedger || (mAcquiringLedger->getHash () != closedLedger))
mAcquiringLedger = getApp().getInboundLedgers ().findCreate (closedLedger, 0, true);
@@ -1298,7 +1314,7 @@ bool NetworkOPsImp::checkLastClosedLedger (const std::vector<Peer::pointer>& pee
if (!mAcquiringLedger || mAcquiringLedger->isFailed ())
{
getApp().getInboundLedgers ().dropLedger (closedLedger);
WriteLog (lsERROR, NetworkOPs) << "Network ledger cannot be acquired";
m_journal.error << "Network ledger cannot be acquired";
return true;
}
@@ -1321,9 +1337,9 @@ void NetworkOPsImp::switchLastClosedLedger (Ledger::pointer newLedger, bool duri
// set the newledger as our last closed ledger -- this is abnormal code
if (duringConsensus)
WriteLog (lsERROR, NetworkOPs) << "JUMPdc last closed ledger to " << newLedger->getHash ();
m_journal.error << "JUMPdc last closed ledger to " << newLedger->getHash ();
else
WriteLog (lsERROR, NetworkOPs) << "JUMP last closed ledger to " << newLedger->getHash ();
m_journal.error << "JUMP last closed ledger to " << newLedger->getHash ();
clearNeedNetworkLedger ();
newLedger->setClosed ();
@@ -1344,8 +1360,8 @@ void NetworkOPsImp::switchLastClosedLedger (Ledger::pointer newLedger, bool duri
int NetworkOPsImp::beginConsensus (uint256 const& networkClosed, Ledger::pointer closingLedger)
{
WriteLog (lsINFO, NetworkOPs) << "Consensus time for ledger " << closingLedger->getLedgerSeq ();
WriteLog (lsINFO, NetworkOPs) << " LCL is " << closingLedger->getParentHash ();
m_journal.info << "Consensus time for ledger " << closingLedger->getLedgerSeq ();
m_journal.info << " LCL is " << closingLedger->getParentHash ();
Ledger::pointer prevLedger = m_ledgerMaster.getLedgerByHash (closingLedger->getParentHash ());
@@ -1354,7 +1370,7 @@ int NetworkOPsImp::beginConsensus (uint256 const& networkClosed, Ledger::pointer
// this shouldn't happen unless we jump ledgers
if (mMode == omFULL)
{
WriteLog (lsWARNING, NetworkOPs) << "Don't have LCL, going to tracking";
m_journal.warning << "Don't have LCL, going to tracking";
setMode (omTRACKING);
}
@@ -1370,7 +1386,7 @@ int NetworkOPsImp::beginConsensus (uint256 const& networkClosed, Ledger::pointer
mConsensus = boost::make_shared<LedgerConsensus> (
networkClosed, prevLedger, m_ledgerMaster.getCurrentLedger ()->getCloseTimeNC ());
WriteLog (lsDEBUG, NetworkOPs) << "Initiating consensus engine";
m_journal.debug << "Initiating consensus engine";
return mConsensus->startup ();
}
@@ -1392,7 +1408,7 @@ bool NetworkOPsImp::haveConsensusObject ()
if (!ledgerChange)
{
WriteLog (lsINFO, NetworkOPs) << "Beginning consensus due to peer action";
m_journal.info << "Beginning consensus due to peer action";
beginConsensus (networkClosed, m_ledgerMaster.getCurrentLedger ());
}
}
@@ -1418,7 +1434,7 @@ void NetworkOPsImp::processTrustedProposal (LedgerProposal::pointer proposal,
if (!haveConsensusObject ())
{
WriteLog (lsINFO, NetworkOPs) << "Received proposal outside consensus window";
m_journal.info << "Received proposal outside consensus window";
if (mMode == omFULL)
relay = false;
@@ -1431,7 +1447,7 @@ void NetworkOPsImp::processTrustedProposal (LedgerProposal::pointer proposal,
if (!set->has_previousledger () && (checkLedger != consensusLCL))
{
WriteLog (lsWARNING, NetworkOPs) << "Have to re-check proposal signature due to consensus view change";
m_journal.warning << "Have to re-check proposal signature due to consensus view change";
assert (proposal->hasSignature ());
proposal->setPrevLedger (consensusLCL);
@@ -1442,7 +1458,7 @@ void NetworkOPsImp::processTrustedProposal (LedgerProposal::pointer proposal,
if (sigGood && (consensusLCL == proposal->getPrevLedger ()))
{
relay = mConsensus->peerPosition (proposal);
WriteLog (lsTRACE, NetworkOPs) << "Proposal processing finished, relay=" << relay;
m_journal.trace << "Proposal processing finished, relay=" << relay;
}
}
@@ -1455,7 +1471,7 @@ void NetworkOPsImp::processTrustedProposal (LedgerProposal::pointer proposal,
}
else
{
WriteLog (lsINFO, NetworkOPs) << "Not relaying trusted proposal";
m_journal.info << "Not relaying trusted proposal";
}
}
}
@@ -1508,7 +1524,7 @@ SHAMapAddNode NetworkOPsImp::gotTXData (const boost::shared_ptr<Peer>& peer, uin
if (!consensus)
{
WriteLog (lsWARNING, NetworkOPs) << "Got TX data with no consensus object";
m_journal.warning << "Got TX data with no consensus object";
return SHAMapAddNode ();
}
@@ -1519,7 +1535,7 @@ bool NetworkOPsImp::hasTXSet (const boost::shared_ptr<Peer>& peer, uint256 const
{
if (!haveConsensusObject ())
{
WriteLog (lsINFO, NetworkOPs) << "Peer has TX set, not during consensus";
m_journal.info << "Peer has TX set, not during consensus";
return false;
}
@@ -1550,7 +1566,7 @@ void NetworkOPsImp::endConsensus (bool correctLCL)
{
if (it && (it->getClosedLedgerHash () == deadLedger))
{
WriteLog (lsTRACE, NetworkOPs) << "Killing obsolete peer status";
m_journal.trace << "Killing obsolete peer status";
it->cycleStatus ();
}
}
@@ -1694,7 +1710,7 @@ NetworkOPsImp::transactionsSQL (std::string selection, const RippleAddress& acco
% lexicalCastThrow <std::string> (offset)
% lexicalCastThrow <std::string> (numberOfResults)
);
WriteLog (lsTRACE, NetworkOPs) << "txSQL query: " << sql;
m_journal.trace << "txSQL query: " << sql;
return sql;
}
@@ -1731,7 +1747,7 @@ NetworkOPsImp::getAccountTxs (const RippleAddress& account, int32 minLedger, int
if (rawMeta.getLength() == 0)
{ // Work around a bug that could leave the metadata missing
uint32 seq = static_cast<uint32>(db->getBigInt("LedgerSeq"));
WriteLog(lsWARNING, NetworkOPs) << "Recovering ledger " << seq << ", txn " << txn->getID();
m_journal.warning << "Recovering ledger " << seq << ", txn " << txn->getID();
Ledger::pointer ledger = getLedgerBySeq(seq);
if (ledger)
ledger->pendSaveValidated(false, false);
@@ -1903,7 +1919,7 @@ NetworkOPsImp::getTxsAccount (const RippleAddress& account, int32 minLedger, int
if (rawMeta.getLength() == 0)
{ // Work around a bug that could leave the metadata missing
uint32 seq = static_cast<uint32>(db->getBigInt("LedgerSeq"));
WriteLog(lsWARNING, NetworkOPs) << "Recovering ledger " << seq << ", txn " << txn->getID();
m_journal.warning << "Recovering ledger " << seq << ", txn " << txn->getID();
Ledger::pointer ledger = getLedgerBySeq(seq);
if (ledger)
ledger->pendSaveValidated(false, false);
@@ -2049,7 +2065,7 @@ NetworkOPsImp::getLedgerAffectedAccounts (uint32 ledgerSeq)
bool NetworkOPsImp::recvValidation (SerializedValidation::ref val, const std::string& source)
{
WriteLog (lsDEBUG, NetworkOPs) << "recvValidation " << val->getLedgerHash () << " from " << source;
m_journal.debug << "recvValidation " << val->getLedgerHash () << " from " << source;
return getApp().getValidations ().addValidation (val, source);
}
@@ -2270,7 +2286,7 @@ void NetworkOPsImp::pubProposedTransaction (Ledger::ref lpCurrent, SerializedTra
}
}
AcceptedLedgerTx alt (stTxn, terResult);
WriteLog (lsTRACE, NetworkOPs) << "pubProposed: " << alt.getJson ();
m_journal.trace << "pubProposed: " << alt.getJson ();
pubAccountTransaction (lpCurrent, AcceptedLedgerTx (stTxn, terResult), false);
}
@@ -2326,7 +2342,7 @@ void NetworkOPsImp::pubLedger (Ledger::ref accepted)
{
BOOST_FOREACH (const AcceptedLedger::value_type & vt, alpAccepted->getMap ())
{
WriteLog (lsTRACE, NetworkOPs) << "pubAccepted: " << vt.second->getJson ();
m_journal.trace << "pubAccepted: " << vt.second->getJson ();
pubValidatedTransaction (lpAccepted, *vt.second);
}
}
@@ -2485,7 +2501,7 @@ void NetworkOPsImp::pubAccountTransaction (Ledger::ref lpCurrent, const Accepted
}
}
}
WriteLog (lsINFO, NetworkOPs) << boost::str (boost::format ("pubAccountTransaction: iProposed=%d iAccepted=%d") % iProposed % iAccepted);
m_journal.info << boost::str (boost::format ("pubAccountTransaction: iProposed=%d iAccepted=%d") % iProposed % iAccepted);
if (!notify.empty ())
{
@@ -2515,7 +2531,7 @@ void NetworkOPsImp::subAccount (InfoSub::ref isrListener, const boost::unordered
// For the connection, monitor each account.
BOOST_FOREACH (const RippleAddress & naAccountID, vnaAccountIDs)
{
WriteLog (lsTRACE, NetworkOPs) << boost::str (boost::format ("subAccount: account: %d") % naAccountID.humanAccountID ());
m_journal.trace << boost::str (boost::format ("subAccount: account: %d") % naAccountID.humanAccountID ());
isrListener->insertSubAccountInfo (naAccountID, uLedgerIndex);
}
@@ -2756,11 +2772,11 @@ void NetworkOPsImp::getBookPage (Ledger::pointer lpLedger, const uint160& uTaker
const uint256 uBookEnd = Ledger::getQualityNext (uBookBase);
uint256 uTipIndex = uBookBase;
WriteLog (lsTRACE, NetworkOPs) << boost::str (boost::format ("getBookPage: uTakerPaysCurrencyID=%s uTakerPaysIssuerID=%s") % STAmount::createHumanCurrency (uTakerPaysCurrencyID) % RippleAddress::createHumanAccountID (uTakerPaysIssuerID));
WriteLog (lsTRACE, NetworkOPs) << boost::str (boost::format ("getBookPage: uTakerGetsCurrencyID=%s uTakerGetsIssuerID=%s") % STAmount::createHumanCurrency (uTakerGetsCurrencyID) % RippleAddress::createHumanAccountID (uTakerGetsIssuerID));
WriteLog (lsTRACE, NetworkOPs) << boost::str (boost::format ("getBookPage: uBookBase=%s") % uBookBase);
WriteLog (lsTRACE, NetworkOPs) << boost::str (boost::format ("getBookPage: uBookEnd=%s") % uBookEnd);
WriteLog (lsTRACE, NetworkOPs) << boost::str (boost::format ("getBookPage: uTipIndex=%s") % uTipIndex);
m_journal.trace << boost::str (boost::format ("getBookPage: uTakerPaysCurrencyID=%s uTakerPaysIssuerID=%s") % STAmount::createHumanCurrency (uTakerPaysCurrencyID) % RippleAddress::createHumanAccountID (uTakerPaysIssuerID));
m_journal.trace << boost::str (boost::format ("getBookPage: uTakerGetsCurrencyID=%s uTakerGetsIssuerID=%s") % STAmount::createHumanCurrency (uTakerGetsCurrencyID) % RippleAddress::createHumanAccountID (uTakerGetsIssuerID));
m_journal.trace << boost::str (boost::format ("getBookPage: uBookBase=%s") % uBookBase);
m_journal.trace << boost::str (boost::format ("getBookPage: uBookEnd=%s") % uBookEnd);
m_journal.trace << boost::str (boost::format ("getBookPage: uTipIndex=%s") % uTipIndex);
LedgerEntrySet lesActive (lpLedger, tapNONE, true);
@@ -2785,13 +2801,13 @@ void NetworkOPsImp::getBookPage (Ledger::pointer lpLedger, const uint160& uTaker
{
bDirectAdvance = false;
WriteLog (lsTRACE, NetworkOPs) << "getBookPage: bDirectAdvance";
m_journal.trace << "getBookPage: bDirectAdvance";
sleOfferDir = lesActive.entryCache (ltDIR_NODE, lpLedger->getNextLedgerIndex (uTipIndex, uBookEnd));
if (!sleOfferDir)
{
WriteLog (lsTRACE, NetworkOPs) << "getBookPage: bDone";
m_journal.trace << "getBookPage: bDone";
bDone = true;
}
else
@@ -2801,8 +2817,8 @@ void NetworkOPsImp::getBookPage (Ledger::pointer lpLedger, const uint160& uTaker
lesActive.dirFirst (uTipIndex, sleOfferDir, uBookEntry, uOfferIndex);
WriteLog (lsTRACE, NetworkOPs) << boost::str (boost::format ("getBookPage: uTipIndex=%s") % uTipIndex);
WriteLog (lsTRACE, NetworkOPs) << boost::str (boost::format ("getBookPage: uOfferIndex=%s") % uOfferIndex);
m_journal.trace << boost::str (boost::format ("getBookPage: uTipIndex=%s") % uTipIndex);
m_journal.trace << boost::str (boost::format ("getBookPage: uOfferIndex=%s") % uOfferIndex);
}
}
@@ -2828,7 +2844,7 @@ void NetworkOPsImp::getBookPage (Ledger::pointer lpLedger, const uint160& uTaker
// Found in running balance table.
saOwnerFunds = umBalanceEntry->second;
// WriteLog (lsINFO, NetworkOPs) << boost::str(boost::format("getBookPage: saOwnerFunds=%s (cached)") % saOwnerFunds.getFullText());
// m_journal.info << boost::str(boost::format("getBookPage: saOwnerFunds=%s (cached)") % saOwnerFunds.getFullText());
}
else
{
@@ -2836,7 +2852,7 @@ void NetworkOPsImp::getBookPage (Ledger::pointer lpLedger, const uint160& uTaker
saOwnerFunds = lesActive.accountHolds (uOfferOwnerID, uTakerGetsCurrencyID, uTakerGetsIssuerID);
// WriteLog (lsINFO, NetworkOPs) << boost::str(boost::format("getBookPage: saOwnerFunds=%s (new)") % saOwnerFunds.getFullText());
// m_journal.info << boost::str(boost::format("getBookPage: saOwnerFunds=%s (new)") % saOwnerFunds.getFullText());
if (saOwnerFunds.isNegative ())
{
// Treat negative funds as zero.
@@ -2874,12 +2890,12 @@ void NetworkOPsImp::getBookPage (Ledger::pointer lpLedger, const uint160& uTaker
}
else
{
// WriteLog (lsINFO, NetworkOPs) << boost::str(boost::format("getBookPage: saTakerGets=%s") % saTakerGets.getFullText());
// WriteLog (lsINFO, NetworkOPs) << boost::str(boost::format("getBookPage: saTakerPays=%s") % saTakerPays.getFullText());
// WriteLog (lsINFO, NetworkOPs) << boost::str(boost::format("getBookPage: saOwnerFunds=%s") % saOwnerFunds.getFullText());
// WriteLog (lsINFO, NetworkOPs) << boost::str(boost::format("getBookPage: saDirRate=%s") % saDirRate.getText());
// WriteLog (lsINFO, NetworkOPs) << boost::str(boost::format("getBookPage: multiply=%s") % STAmount::multiply(saTakerGetsFunded, saDirRate).getFullText());
// WriteLog (lsINFO, NetworkOPs) << boost::str(boost::format("getBookPage: multiply=%s") % STAmount::multiply(saTakerGetsFunded, saDirRate, saTakerPays).getFullText());
// m_journal.info << boost::str(boost::format("getBookPage: saTakerGets=%s") % saTakerGets.getFullText());
// m_journal.info << boost::str(boost::format("getBookPage: saTakerPays=%s") % saTakerPays.getFullText());
// m_journal.info << boost::str(boost::format("getBookPage: saOwnerFunds=%s") % saOwnerFunds.getFullText());
// m_journal.info << boost::str(boost::format("getBookPage: saDirRate=%s") % saDirRate.getText());
// m_journal.info << boost::str(boost::format("getBookPage: multiply=%s") % STAmount::multiply(saTakerGetsFunded, saDirRate).getFullText());
// m_journal.info << boost::str(boost::format("getBookPage: multiply=%s") % STAmount::multiply(saTakerGetsFunded, saDirRate, saTakerPays).getFullText());
// Only provide, if not fully funded.
@@ -2909,7 +2925,7 @@ void NetworkOPsImp::getBookPage (Ledger::pointer lpLedger, const uint160& uTaker
}
else
{
WriteLog (lsTRACE, NetworkOPs) << boost::str (boost::format ("getBookPage: uOfferIndex=%s") % uOfferIndex);
m_journal.trace << boost::str (boost::format ("getBookPage: uOfferIndex=%s") % uOfferIndex);
}
}
}
@@ -2933,13 +2949,13 @@ void NetworkOPsImp::makeFetchPack (Job&, boost::weak_ptr<Peer> wPeer,
{
if (UptimeTimer::getInstance ().getElapsedSeconds () > (uUptime + 1))
{
WriteLog (lsINFO, NetworkOPs) << "Fetch pack request got stale";
m_journal.info << "Fetch pack request got stale";
return;
}
if (getApp().getFeeTrack ().isLoadedLocal ())
{
WriteLog (lsINFO, NetworkOPs) << "Too busy to make fetch pack";
m_journal.info << "Too busy to make fetch pack";
return;
}
@@ -2986,13 +3002,13 @@ void NetworkOPsImp::makeFetchPack (Job&, boost::weak_ptr<Peer> wPeer,
}
while (wantLedger && (UptimeTimer::getInstance ().getElapsedSeconds () <= (uUptime + 1)));
WriteLog (lsINFO, NetworkOPs) << "Built fetch pack with " << reply.objects ().size () << " nodes";
m_journal.info << "Built fetch pack with " << reply.objects ().size () << " nodes";
PackedMessage::pointer msg = boost::make_shared<PackedMessage> (reply, protocol::mtGET_OBJECTS);
peer->sendPacket (msg, false);
}
catch (...)
{
WriteLog (lsWARNING, NetworkOPs) << "Exception building fetch pach";
m_journal.warning << "Exception building fetch pach";
}
}
@@ -3017,7 +3033,7 @@ bool NetworkOPsImp::getFetchPack (uint256 const& hash, Blob& data)
if (hash != Serializer::getSHA512Half (data))
{
WriteLog (lsWARNING, NetworkOPs) << "Bad entry in fetch pack";
m_journal.warning << "Bad entry in fetch pack";
return false;
}
@@ -3048,19 +3064,28 @@ void NetworkOPsImp::missingNodeInLedger (uint32 seq)
uint256 hash = getApp().getLedgerMaster ().getHashBySeq (seq);
if (hash.isZero())
{
WriteLog (lsWARNING, NetworkOPs) << "Missing a node in ledger " << seq << " cannot fetch";
m_journal.warning << "Missing a node in ledger " << seq << " cannot fetch";
}
else
{
WriteLog (lsWARNING, NetworkOPs) << "Missing a node in ledger " << seq << " fetching";
m_journal.warning << "Missing a node in ledger " << seq << " fetching";
getApp().getInboundLedgers ().findCreate (hash, seq, false);
}
}
//------------------------------------------------------------------------------
NetworkOPs* NetworkOPs::New (LedgerMaster& ledgerMaster)
NetworkOPs::NetworkOPs (Service& parent)
: InfoSub::Source ("NetworkOPs", parent)
{
ScopedPointer <NetworkOPs> object (new NetworkOPsImp (ledgerMaster));
}
//------------------------------------------------------------------------------
NetworkOPs* NetworkOPs::New (LedgerMaster& ledgerMaster,
Service& parent, Journal journal)
{
ScopedPointer <NetworkOPs> object (new NetworkOPsImp (
ledgerMaster, parent, journal));
return object.release ();
}

View File

@@ -36,8 +36,12 @@ class LedgerConsensus;
instances of rippled will need to be hardened to protect against hostile
or unreliable servers.
*/
class NetworkOPs : public InfoSub::Source
class NetworkOPs
: public InfoSub::Source
{
protected:
explicit NetworkOPs (Service& parent);
public:
enum Fault
{
@@ -63,7 +67,8 @@ public:
public:
// VFALCO TODO Make LedgerMaster a SharedPtr or a reference.
//
static NetworkOPs* New (LedgerMaster& ledgerMaster);
static NetworkOPs* New (LedgerMaster& ledgerMaster,
Service& parent, Journal journal);
virtual ~NetworkOPs () { }

View File

@@ -33,10 +33,11 @@ static int s_nodeStoreDBCount = NUMBER (s_nodeStoreDBInit);
class SqliteBackendFactory::Backend : public NodeStore::Backend
{
public:
Backend (size_t keyBytes, std::string const& path)
Backend (size_t keyBytes, std::string const& path, NodeStore::Scheduler& scheduler)
: m_keyBytes (keyBytes)
, m_name (path)
, m_db (new DatabaseCon(path, s_nodeStoreDBInit, s_nodeStoreDBCount))
, m_scheduler (scheduler)
{
String s;
@@ -164,6 +165,11 @@ public:
return 0;
}
void stopAsync ()
{
m_scheduler.scheduledTasksStopped ();
}
//--------------------------------------------------------------------------
void doBind (SqliteStatement& statement, NodeObject::ref object)
@@ -205,6 +211,7 @@ private:
size_t const m_keyBytes;
std::string const m_name;
ScopedPointer <DatabaseCon> m_db;
NodeStore::Scheduler& m_scheduler;
};
//------------------------------------------------------------------------------
@@ -232,5 +239,5 @@ NodeStore::Backend* SqliteBackendFactory::createInstance (
StringPairArray const& keyValues,
NodeStore::Scheduler& scheduler)
{
return new Backend (keyBytes, keyValues ["path"].toStdString ());
return new Backend (keyBytes, keyValues ["path"].toStdString (), scheduler);
}

View File

@@ -358,7 +358,7 @@ bool PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
return true;
}
void PathRequest::updateAll (Ledger::ref ledger, bool newOnly)
void PathRequest::updateAll (Ledger::ref ledger, bool newOnly, CancelCallback shouldCancel)
{
std::set<wptr> requests;
@@ -374,6 +374,9 @@ void PathRequest::updateAll (Ledger::ref ledger, bool newOnly)
BOOST_FOREACH (wref wRequest, requests)
{
if (shouldCancel())
break;
bool remove = true;
PathRequest::pointer pRequest = wRequest.lock ();

View File

@@ -41,7 +41,7 @@ public:
bool doUpdate (const boost::shared_ptr<RippleLineCache>&, bool fast); // update jvStatus
static void updateAll (const boost::shared_ptr<Ledger>& ledger, bool newOnly);
static void updateAll (const boost::shared_ptr<Ledger>& ledger, bool newOnly, CancelCallback shouldCancel);
private:
void setValid ();

View File

@@ -6,12 +6,15 @@
SETUP_LOG (PeerDoor)
class PeerDoorImp : public PeerDoor, LeakChecked <PeerDoorImp>
class PeerDoorImp
: public PeerDoor
, public LeakChecked <PeerDoorImp>
{
public:
PeerDoorImp (Kind kind, std::string const& ip, int port,
PeerDoorImp (Service& parent, Kind kind, std::string const& ip, int port,
boost::asio::io_service& io_service, boost::asio::ssl::context& ssl_context)
: m_kind (kind)
: PeerDoor (parent)
, m_kind (kind)
, m_ssl_context (ssl_context)
, mAcceptor (io_service, boost::asio::ip::tcp::endpoint (
boost::asio::ip::address ().from_string (ip.empty () ? "0.0.0.0" : ip), port))
@@ -20,13 +23,20 @@ public:
if (! ip.empty () && port != 0)
{
Log (lsINFO) << "Peer port: " << ip << " " << port;
startListening ();
async_accept ();
}
}
~PeerDoorImp ()
{
}
//--------------------------------------------------------------------------
void startListening ()
// Initiating function for performing an asynchronous accept
//
void async_accept ()
{
bool const isInbound (true);
bool const requirePROXYHandshake (m_kind == sslAndPROXYRequired);
@@ -37,40 +47,67 @@ public:
isInbound, requirePROXYHandshake));
mAcceptor.async_accept (new_connection->getNativeSocket (),
boost::bind (&PeerDoorImp::handleConnect, this, new_connection,
boost::asio::placeholders::error));
boost::bind (&PeerDoorImp::handleAccept, this,
boost::asio::placeholders::error,
new_connection));
}
//--------------------------------------------------------------------------
void handleConnect (Peer::pointer new_connection,
boost::system::error_code const& error)
// Called when the deadline timer wait completes
//
void handleTimer (boost::system::error_code ec)
{
async_accept ();
}
// Called when the accept socket wait completes
//
void handleAccept (boost::system::error_code ec, Peer::pointer new_connection)
{
bool delay = false;
if (!error)
if (! ec)
{
new_connection->connected (error);
// VFALCO NOTE the error code doesnt seem to be used in connected()
new_connection->connected (ec);
}
else
{
if (error == boost::system::errc::too_many_files_open)
if (ec == boost::system::errc::too_many_files_open)
delay = true;
WriteLog (lsERROR, PeerDoor) << error;
WriteLog (lsERROR, PeerDoor) << ec;
}
if (delay)
{
mDelayTimer.expires_from_now (boost::posix_time::milliseconds (500));
mDelayTimer.async_wait (boost::bind (&PeerDoorImp::startListening, this));
mDelayTimer.async_wait (boost::bind (&PeerDoorImp::handleTimer,
this, boost::asio::placeholders::error));
}
else
{
startListening ();
async_accept ();
}
}
//--------------------------------------------------------------------------
void onServiceStop ()
{
{
boost::system::error_code ec;
mDelayTimer.cancel (ec);
}
{
boost::system::error_code ec;
mAcceptor.cancel (ec);
}
serviceStopped ();
}
private:
Kind m_kind;
boost::asio::ssl::context& m_ssl_context;
@@ -80,8 +117,15 @@ private:
//------------------------------------------------------------------------------
PeerDoor* PeerDoor::New (Kind kind, std::string const& ip, int port,
PeerDoor::PeerDoor (Service& parent)
: AsyncService ("PeerDoor", parent)
{
}
//------------------------------------------------------------------------------
PeerDoor* PeerDoor::New (Service& parent, Kind kind, std::string const& ip, int port,
boost::asio::io_service& io_service, boost::asio::ssl::context& ssl_context)
{
return new PeerDoorImp (kind, ip, port, io_service, ssl_context);
return new PeerDoorImp (parent, kind, ip, port, io_service, ssl_context);
}

View File

@@ -7,10 +7,12 @@
#ifndef RIPPLE_PEERDOOR_H_INCLUDED
#define RIPPLE_PEERDOOR_H_INCLUDED
/** Handles incoming connections from peers.
*/
class PeerDoor : LeakChecked <PeerDoor>
/** Handles incoming connections from peers. */
class PeerDoor : public AsyncService
{
protected:
explicit PeerDoor (Service& parent);
public:
virtual ~PeerDoor () { }
@@ -20,7 +22,7 @@ public:
sslAndPROXYRequired
};
static PeerDoor* New (Kind kind, std::string const& ip, int port,
static PeerDoor* New (Service& parent, Kind kind, std::string const& ip, int port,
boost::asio::io_service& io_service, boost::asio::ssl::context& ssl_context);
//virtual boost::asio::ssl::context& getSSLContext () = 0;

View File

@@ -8,7 +8,8 @@ SETUP_LOG (Peers)
class PeersImp
: public Peers
, LeakChecked <PeersImp>
, public Service
, public LeakChecked <PeersImp>
{
public:
enum
@@ -18,8 +19,11 @@ public:
policyIntervalSeconds = 5
};
PeersImp (boost::asio::io_service& io_service, boost::asio::ssl::context& ssl_context)
: m_io_service (io_service)
PeersImp (Service& parent,
boost::asio::io_service& io_service,
boost::asio::ssl::context& ssl_context)
: Service ("Peers", parent)
, m_io_service (io_service)
, m_ssl_context (ssl_context)
, mPeerLock (this, "PeersImp", __FILE__, __LINE__)
, mLastPeer (0)
@@ -907,8 +911,10 @@ void PeersImp::scanRefresh ()
//------------------------------------------------------------------------------
Peers* Peers::New (boost::asio::io_service& io_service, boost::asio::ssl::context& ssl_context)
Peers* Peers::New (Service& parent,
boost::asio::io_service& io_service,
boost::asio::ssl::context& ssl_context)
{
return new PeersImp (io_service, ssl_context);
return new PeersImp (parent, io_service, ssl_context);
}

View File

@@ -12,7 +12,8 @@
class Peers
{
public:
static Peers* New (boost::asio::io_service& io_service,
static Peers* New (Service& parent,
boost::asio::io_service& io_service,
boost::asio::ssl::context& context);
virtual ~Peers () { }

View File

@@ -89,8 +89,9 @@ private:
typedef boost::unordered_map<std::pair< std::string, int>, score> epScore;
public:
UniqueNodeListImp ()
: mFetchLock (this, "Fetch", __FILE__, __LINE__)
explicit UniqueNodeListImp (Service& parent)
: UniqueNodeList (parent)
, mFetchLock (this, "Fetch", __FILE__, __LINE__)
, mUNLLock (this, "UNL", __FILE__, __LINE__)
, m_scoreTimer (this)
, mFetchActive (0)
@@ -100,6 +101,16 @@ public:
//--------------------------------------------------------------------------
void onServiceStop ()
{
m_fetchTimer.cancel ();
m_scoreTimer.cancel ();
serviceStopped ();
}
//--------------------------------------------------------------------------
void doScore ()
{
mtpScoreNext = boost::posix_time::ptime (boost::posix_time::not_a_date_time); // Timer not set.
@@ -1150,17 +1161,6 @@ private:
//--------------------------------------------------------------------------
// Begin scoring if timer was not cancelled.
void scoreTimerHandler (const boost::system::error_code& err)
{
if (!err)
{
onDeadlineTimer (m_scoreTimer);
}
}
//--------------------------------------------------------------------------
// Start a timer to update scores.
// <-- bNow: true, to force scoring for debugging.
void scoreNext (bool bNow)
@@ -2052,9 +2052,6 @@ private:
% getConfig ().VALIDATORS_BASE);
}
}
//--------------------------------------------------------------------------
private:
typedef RippleMutex FetchLockType;
typedef FetchLockType::ScopedLockType ScopedFetchLockType;
@@ -2085,7 +2082,16 @@ private:
std::map<RippleAddress, ClusterNodeStatus> m_clusterNodes;
};
UniqueNodeList* UniqueNodeList::New ()
//------------------------------------------------------------------------------
UniqueNodeList::UniqueNodeList (Service& parent)
: Service ("UniqueNodeList", parent)
{
return new UniqueNodeListImp ();
}
//------------------------------------------------------------------------------
UniqueNodeList* UniqueNodeList::New (Service& parent)
{
return new UniqueNodeListImp (parent);
}

View File

@@ -7,8 +7,11 @@
#ifndef RIPPLE_UNIQUENODELIST_H_INCLUDED
#define RIPPLE_UNIQUENODELIST_H_INCLUDED
class UniqueNodeList
class UniqueNodeList : public Service
{
protected:
explicit UniqueNodeList (Service& parent);
public:
enum ValidatorSource
{
@@ -26,7 +29,7 @@ public:
public:
// VFALCO TODO make this not use boost::asio...
static UniqueNodeList* New ();
static UniqueNodeList* New (Service& parent);
virtual ~UniqueNodeList () { }

View File

@@ -11,6 +11,7 @@
#include <boost/bimap/list_of.hpp>
#include <boost/bimap/multiset_of.hpp>
#include <boost/bimap/unordered_set_of.hpp>
#include <boost/optional.hpp>
#include "ripple_app.h"
@@ -29,6 +30,9 @@ namespace ripple
// Application
//
# include "main/IoServicePool.h"
#include "main/IoServicePool.cpp"
# include "main/FatalErrorReporter.h"
#include "main/FatalErrorReporter.cpp"

View File

@@ -28,7 +28,8 @@ public:
WSDoorImp (InfoSub::Source& source,
std::string const& strIp, int iPort, bool bPublic,
boost::asio::ssl::context& ssl_context)
: Thread ("websocket")
: WSDoor (source)
, Thread ("websocket")
, m_source (source)
, m_ssl_context (ssl_context)
, m_endpointLock (this, "WSDoor", __FILE__, __LINE__)
@@ -41,15 +42,7 @@ public:
~WSDoorImp ()
{
{
ScopedLockType lock (m_endpointLock, __FILE__, __LINE__);
if (m_endpoint != nullptr)
m_endpoint->stop ();
}
signalThreadShouldExit ();
waitForThreadToExit ();
stopThread ();
}
private:
@@ -101,6 +94,23 @@ private:
m_endpoint = nullptr;
}
serviceStopped ();
}
void onServiceStop ()
{
{
ScopedLockType lock (m_endpointLock, __FILE__, __LINE__);
// VFALCO NOTE we probably dont want to block here
// but websocketpp is deficient and broken.
//
if (m_endpoint != nullptr)
m_endpoint->stop ();
}
signalThreadShouldExit ();
}
private:
@@ -119,6 +129,13 @@ private:
//------------------------------------------------------------------------------
WSDoor::WSDoor (Service& parent)
: Service ("WSDoor", parent)
{
}
//------------------------------------------------------------------------------
WSDoor* WSDoor::New (InfoSub::Source& source, std::string const& strIp,
int iPort, bool bPublic, boost::asio::ssl::context& ssl_context)
{

View File

@@ -8,9 +8,14 @@
#define RIPPLE_WSDOOR_RIPPLEHEADER
/** Handles accepting incoming WebSocket connections. */
class WSDoor
class WSDoor : public Service
{
protected:
explicit WSDoor (Service& parent);
public:
virtual ~WSDoor () { }
static WSDoor* New (InfoSub::Source& source, std::string const& strIp,
int iPort, bool bPublic, boost::asio::ssl::context& ssl_context);
};

View File

@@ -93,17 +93,18 @@ void logTimedDestroy (
//------------------------------------------------------------------------------
/** Log a timed function call if the time exceeds a threshold. */
template <typename PartitionKey, typename Function>
void logTimedCall (String description, char const* fileName, int lineNumber,
template <typename Function>
void logTimedCall (Journal::Stream stream,
String description,
char const* fileName,
int lineNumber,
Function f, double thresholdSeconds = 1)
{
double const seconds = measureFunctionCallTime (f);
if (seconds > thresholdSeconds)
{
LogSeverity const severity = lsWARNING;
Log (severity, LogPartition::get <PartitionKey> ()) <<
stream <<
description << " took "<<
String (detail::cleanElapsed (seconds)) <<
" seconds to execute at " <<

View File

@@ -89,6 +89,7 @@ namespace ripple
#include "utility/CountedObject.cpp"
#include "utility/DiffieHellmanUtil.cpp"
#include "utility/IniFile.cpp"
#include "utility/Service.cpp"
#include "utility/StringUtilities.cpp"
#include "utility/Sustain.cpp"
#include "utility/ThreadName.cpp"

View File

@@ -108,6 +108,7 @@ using namespace beast;
#include "utility/IniFile.h"
#include "utility/PlatformMacros.h"
#include "utility/RandomNumbers.h"
#include "utility/Service.h"
#include "utility/StringUtilities.h"
#include "utility/Sustain.h"
#include "utility/ThreadName.h"

View File

@@ -35,5 +35,7 @@ typedef UnsignedInteger <33> RipplePublicKey;
/** A container holding the hash of a public key in binary format. */
typedef UnsignedInteger <20> RipplePublicKeyHash;
/** A callback used to check for canceling an operation. */
typedef SharedFunction <bool(void)> CancelCallback;
#endif

View File

@@ -0,0 +1,200 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
Service::Service (char const* name)
: m_name (name)
, m_root (true)
, m_child (this)
, m_calledServiceStop (false)
, m_stopped (false)
, m_childrenStopped (false)
{
}
Service::Service (char const* name, Service* parent)
: m_name (name)
, m_root (parent != nullptr)
, m_child (this)
, m_calledServiceStop (false)
, m_stopped (false)
, m_childrenStopped (false)
{
if (parent != nullptr)
{
// must not have had stop called
bassert (! parent->isServiceStopping());
parent->m_children.push_front (&m_child);
}
}
Service::Service (char const* name, Service& parent)
: m_name (name)
, m_root (false)
, m_child (this)
, m_calledServiceStop (false)
, m_stopped (false)
, m_childrenStopped (false)
{
// must not have had stop called
bassert (! parent.isServiceStopping());
parent.m_children.push_front (&m_child);
}
Service::~Service ()
{
// must be stopped
bassert (m_stopped);
// children must be stopped
bassert (m_childrenStopped);
}
char const* Service::serviceName () const
{
return m_name;
}
void Service::serviceStop (Journal::Stream stream)
{
// may only be called once
if (m_calledServiceStop)
return;
m_calledServiceStop = true;
// must be called from a root service
bassert (m_root);
// send the notification
serviceStopAsync ();
// now block on the tree of Service objects from the leaves up.
stopRecursive (stream);
}
void Service::serviceStopAsync ()
{
// must be called from a root service
bassert (m_root);
stopAsyncRecursive ();
}
bool Service::isServiceStopping ()
{
return m_calledStopAsync.get() != 0;
}
bool Service::isServiceStopped ()
{
return m_stopped;
}
bool Service::areServiceChildrenStopped ()
{
return m_childrenStopped;
}
void Service::serviceStopped ()
{
m_stoppedEvent.signal();
}
void Service::onServiceStop()
{
serviceStopped();
}
void Service::onServiceChildrenStopped ()
{
}
//------------------------------------------------------------------------------
void Service::stopAsyncRecursive ()
{
// make sure we only do this once
if (m_root)
{
// if this fails, some other thread got to it first
if (! m_calledStopAsync.compareAndSetBool (1, 0))
return;
}
else
{
// can't possibly already be set
bassert (m_calledStopAsync.get() == 0);
m_calledStopAsync.set (1);
}
// notify this service
onServiceStop ();
// notify children
for (Children::const_iterator iter (m_children.cbegin ());
iter != m_children.cend(); ++iter)
{
iter->service->stopAsyncRecursive();
}
}
void Service::stopRecursive (Journal::Stream stream)
{
// Block on each child recursively. Thinking of the Service
// hierarchy as a tree with the root at the top, we will block
// first on leaves, and then at each successivly higher level.
//
for (Children::const_iterator iter (m_children.cbegin ());
iter != m_children.cend(); ++iter)
{
iter->service->stopRecursive (stream);
}
// Once we get here, we either have no children, or all of
// our children have stopped, so update state accordingly.
//
m_childrenStopped = true;
// Notify derived class that children have stopped.
onServiceChildrenStopped ();
// Block until this service stops. First we do a timed wait of 1 second, and
// if that times out we report to the Journal and then do an infinite wait.
//
bool const timedOut (! m_stoppedEvent.wait (1 * 1000)); // milliseconds
if (timedOut)
{
stream << "Service: Waiting for '" << serviceName() << "' to stop";
m_stoppedEvent.wait ();
}
// once we get here, we know the service has stopped.
m_stopped = true;
}
//------------------------------------------------------------------------------
ScopedService::ScopedService (char const* name)
: Service (name)
{
}
ScopedService::~ScopedService ()
{
serviceStop();
}
void ScopedService::onServiceStop ()
{
serviceStopped();
}
void ScopedService::onServiceChildrenStopped ()
{
}

View File

@@ -0,0 +1,275 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
#ifndef RIPPLE_BASICS_SERVICE_H_INCLUDED
#define RIPPLE_BASICS_SERVICE_H_INCLUDED
/** Abstraction for organizing partitioned support code.
The main thing a service can do, is to stop. Once it stops it cannot be
reused, it can only be destroyed. This interface is used to coordinate
the complex activities required for a clean exit in the presence of
pending asynchronous i/o and multiple theads.
This is the sequence of events involved in stopping a service:
1. serviceStopAsync() [optional]
This notifies the Service and all its children that a stop is requested.
2. serviceStop ()
This first calls serviceStopAsync(), and then blocks on each Service
in the tree from the bottom up, until the Service indicates it has
stopped. This will usually be called from the main thread of execution
when some external signal indicates that the process should stop.
FOr example, an RPC 'stop' command, or a SIGINT POSIX signal.
3. onServiceStop ()
This is called for the root Service and all its children when a stop
is requested. Derived classes should cancel pending I/O and timers,
signal that threads should exit, queue cleanup jobs, and perform any
other necessary clean up in preparation for exit.
4. onServiceChildrenStopped ()
When all the children of a service have stopped, this will be called.
This informs the Service that there should not be any more dependents
making calls into the derived class member functions. A Service that
has no children will have this function called immediately.
5. serviceStopped ()
The derived class calls this function to inform the Service API that
it has completed the stop. This unblocks the caller of serviceStop().
For services which are only considered stopped when all of their children
have stopped, and their own internal logic indicates a stop, it will be
necessary to perform special actions in onServiceChildrenStopped(). The
funtion areServiceChildrenStopped() can be used after children have
stopped, but before the Service logic itself has stopped, to determine
if the stopped service logic is a true stop.
Pseudo code for this process is as follows:
@code
// Returns `true` if derived logic has stopped.
//
// When the logic stops, logicProcessingStop() is no longer called.
// If children are still active we need to wait until we get a
// notification that the children have stopped.
//
bool logicHasStopped ();
// Called when children have stopped
void onServiceChildrenStopped ()
{
// We have stopped when the derived logic stops and children stop.
if (logicHasStopped)
serviceStopped();
}
// derived-specific logic that executes periodically
void logicProcessingStep ()
{
// do the step
// ...
// now see if we've stopped
if (logicHasStopped() && areServiceChildrenStopped())
serviceStopped();
}
@endcode
*/
class Service
{
public:
/** Create a service.
Services are always created in a non-stopped state.
A service without a parent is a root service.
*/
/** @{ */
explicit Service (char const* name);
Service (char const* name, Service* parent);
Service (char const* name, Service& parent);
/** @} */
/** Destroy the service.
Undefined behavior results if the service is not first stopped.
In general, services are not allowed to be created and destroyed
dynamically. The set of services should be static at some point
after the initialization of the process. If you need a dynamic
service, consider having a static Service which marshals service
calls to a second custom interface.
*/
virtual ~Service ();
/** Returns the name of the service. */
char const* serviceName () const;
/** Notify a root service and its children to stop, and block until stopped.
If the service was already notified, it is not notified again.
The call blocks until the service and all of its children have stopped.
Thread safety:
Safe to call from any thread not associated with a Service.
This function may only be called once.
@param stream An optional Journal stream on which to log progress.
*/
void serviceStop (Journal::Stream stream = Journal::Stream());
/** Notify a root service and children to stop, without waiting.
If the service was already notified, it is not notified again.
While this is safe to call more than once, only the first call
has any effect.
Thread safety:
Safe to call from any thread at any time.
*/
void serviceStopAsync ();
/** Returns `true` if the service should stop.
Call from the derived class to determine if a long-running
operation should be canceled.
Note that this is not appropriate for either threads, or asynchronous
I/O. For threads, use the thread-specific facilities available to
inform the thread that it should exi. For asynchronous I/O, cancel
all pending operations inside the onServiceStop overide.
Thread safety:
Safe to call from any thread at any time.
@see onServiceStop
*/
bool isServiceStopping ();
/** Returns `true` if the service has stopped.
Thread safety:
Safe to call from any thread at any time.
*/
bool isServiceStopped ();
/** Returns `true` if all children have stopped.
Children of services with no children are considered stopped if
the service has been notified.
Thread safety:
Safe to call from any thread at any time.
*/
bool areServiceChildrenStopped ();
/** Called by derived classes to indicate that the service has stopped.
The derived class must call this either after isServiceStopping
returns `true`, or when onServiceStop is called, or else a call
to serviceStop will never return.
Thread safety:
Safe to call from any thread at any time.
*/
void serviceStopped ();
/** Called when the stop notification is issued.
The call is made on an unspecified, implementation-specific thread.
onServiceStop and onServiceChildrenStopped will never be called
concurrently, across all Service objects descended from the same root,
inclusive of the root.
It is safe to call isServiceStopping, isServiceStopped, and
areServiceChildrenStopped from within this function; The values
returned will always be valid and never change during the callback.
The default implementation simply calls serviceStopped(). This is
applicable when the Service has a trivial stop operation (or no
stop operation), and we are merely using the Service API to position
it as a dependency of some parent service.
Thread safety:
May not block for long periods.
Guaranteed only to be called once.
Must be safe to call from any thread at any time.
*/
virtual void onServiceStop ();
/** Called when all children of a service have stopped.
The call is made on an unspecified, implementation-specific thread.
onServiceStop and onServiceChildrenStopped will never be called
concurrently, across all Service objects descended from the same root,
inclusive of the root.
It is safe to call isServiceStopping, isServiceStopped, and
areServiceChildrenStopped from within this function; The values
returned will always be valid and never change during the callback.
Thread safety:
May not block for long periods.
Guaranteed only to be called once.
Must be safe to call from any thread at any time.
*/
virtual void onServiceChildrenStopped ();
private:
struct Child;
typedef LockFreeStack <Child> Children;
struct Child : Children::Node
{
Child (Service* service_) : service (service_)
{
}
Service* service;
};
void stopAsyncRecursive ();
void stopRecursive (Journal::Stream stream);
char const* m_name;
bool m_root;
Child m_child;
Children m_children;
// Flag that we called serviceStop. This is for diagnostics.
bool m_calledServiceStop;
// Atomic flag to make sure we only call serviceStopAsync once.
Atomic <int> m_calledStopAsync;
// Flag that this service stopped. Never goes back to false.
bool volatile m_stopped;
// Flag that all children have stopped (recursive). Never goes back to false.
bool volatile m_childrenStopped;
// serviceStop() blocks on this event until serviceStopped() is called.
WaitableEvent m_stoppedEvent;
};
//------------------------------------------------------------------------------
/** A root Service with a short scope.
This Service takes care of stopping automatically, no additional
action is required.
*/
class ScopedService : public Service
{
public:
explicit ScopedService (char const* name);
~ScopedService ();
void onServiceStop ();
void onServiceChildrenStopped ();
};
#endif

View File

@@ -16,12 +16,24 @@ Job::Job (JobType type, uint64 index)
{
}
Job::Job (Job const& other)
: m_cancelCallback (other.m_cancelCallback)
, mType (other.mType)
, mJobIndex (other.mJobIndex)
, mJob (other.mJob)
, m_loadEvent (other.m_loadEvent)
, mName (other.mName)
{
}
Job::Job (JobType type,
std::string const& name,
uint64 index,
LoadMonitor& lm,
FUNCTION_TYPE <void (Job&)> const& job)
: mType (type)
FUNCTION_TYPE <void (Job&)> const& job,
CancelCallback cancelCallback)
: m_cancelCallback (cancelCallback)
, mType (type)
, mJobIndex (index)
, mJob (job)
, mName (name)
@@ -29,11 +41,35 @@ Job::Job (JobType type,
m_loadEvent = boost::make_shared <LoadEvent> (boost::ref (lm), name, false);
}
Job& Job::operator= (Job const& other)
{
mType = other.mType;
mJobIndex = other.mJobIndex;
mJob = other.mJob;
m_loadEvent = other.m_loadEvent;
mName = other.mName;
m_cancelCallback = other.m_cancelCallback;
return *this;
}
JobType Job::getType () const
{
return mType;
}
CancelCallback Job::getCancelCallback () const
{
bassert (! m_cancelCallback.empty());
return m_cancelCallback;
}
bool Job::shouldCancel () const
{
if (! m_cancelCallback.empty ())
return m_cancelCallback ();
return false;
}
void Job::doJob ()
{
m_loadEvent->reName (mName);

View File

@@ -67,6 +67,8 @@ public:
//
Job ();
Job (Job const& other);
Job (JobType type, uint64 index);
// VFALCO TODO try to remove the dependency on LoadMonitor.
@@ -74,10 +76,18 @@ public:
std::string const& name,
uint64 index,
LoadMonitor& lm,
FUNCTION_TYPE <void (Job&)> const& job);
FUNCTION_TYPE <void (Job&)> const& job,
CancelCallback cancelCallback);
Job& operator= (Job const& other);
JobType getType () const;
CancelCallback getCancelCallback () const;
/** Returns `true` if the running job should make a best-effort cancel. */
bool shouldCancel () const;
void doJob ();
void rename (const std::string& n);
@@ -93,6 +103,7 @@ public:
static const char* toString (JobType);
private:
CancelCallback m_cancelCallback;
JobType mType;
uint64 mJobIndex;
FUNCTION_TYPE <void (Job&)> mJob;

View File

@@ -4,38 +4,64 @@
*/
//==============================================================================
SETUP_LOG (JobQueue)
//------------------------------------------------------------------------------
JobQueue::Count::Count () noexcept
class JobQueueImp
: public JobQueue
, private Workers::Callback
{
public:
// Statistics for each JobType
//
struct Count
{
Count () noexcept
: type (jtINVALID)
, waiting (0)
, running (0)
, deferred (0)
{
}
{
}
JobQueue::Count::Count (JobType type_) noexcept
Count (JobType type_) noexcept
: type (type_)
, waiting (0)
, running (0)
, deferred (0)
{
}
{
}
//------------------------------------------------------------------------------
JobType type; // The type of Job these counts reflect
int waiting; // The number waiting
int running; // How many are running
int deferred; // Number of jobs we didn't signal due to limits
};
JobQueue::State::State ()
: lastJob (0)
{
}
typedef std::set <Job> JobSet;
typedef std::map <JobType, Count> MapType;
typedef CriticalSection::ScopedLockType ScopedLock;
//------------------------------------------------------------------------------
Journal m_journal;
CriticalSection m_mutex;
uint64 m_lastJob;
JobSet m_jobSet;
MapType m_jobCounts;
JobQueue::JobQueue ()
: m_workers (*this, "JobQueue", 0)
{
// The number of jobs running through processTask()
int m_processCount;
Workers m_workers;
LoadMonitor m_loads [NUM_JOB_TYPES];
CancelCallback m_cancelCallback;
//--------------------------------------------------------------------------
JobQueueImp (Service& parent, Journal journal)
: JobQueue ("JobQueue", parent)
, m_journal (journal)
, m_lastJob (0)
, m_processCount (0)
, m_workers (*this, "JobQueue", 0)
, m_cancelCallback (boost::bind (&Service::isServiceStopping, this))
{
{
ScopedLock lock (m_mutex);
@@ -44,113 +70,187 @@ JobQueue::JobQueue ()
for (int i = 0; i < NUM_JOB_TYPES; ++i)
{
JobType const type (static_cast <JobType> (i));
m_state.jobCounts [type] = Count (type);
m_jobCounts [type] = Count (type);
}
}
mJobLoads [ jtPUBOLDLEDGER ].setTargetLatency (10000, 15000);
mJobLoads [ jtVALIDATION_ut ].setTargetLatency (2000, 5000);
mJobLoads [ jtPROOFWORK ].setTargetLatency (2000, 5000);
mJobLoads [ jtTRANSACTION ].setTargetLatency (250, 1000);
mJobLoads [ jtPROPOSAL_ut ].setTargetLatency (500, 1250);
mJobLoads [ jtPUBLEDGER ].setTargetLatency (3000, 4500);
mJobLoads [ jtWAL ].setTargetLatency (1000, 2500);
mJobLoads [ jtVALIDATION_t ].setTargetLatency (500, 1500);
mJobLoads [ jtWRITE ].setTargetLatency (1750, 2500);
mJobLoads [ jtTRANSACTION_l ].setTargetLatency (100, 500);
mJobLoads [ jtPROPOSAL_t ].setTargetLatency (100, 500);
m_loads [ jtPUBOLDLEDGER ].setTargetLatency (10000, 15000);
m_loads [ jtVALIDATION_ut ].setTargetLatency (2000, 5000);
m_loads [ jtPROOFWORK ].setTargetLatency (2000, 5000);
m_loads [ jtTRANSACTION ].setTargetLatency (250, 1000);
m_loads [ jtPROPOSAL_ut ].setTargetLatency (500, 1250);
m_loads [ jtPUBLEDGER ].setTargetLatency (3000, 4500);
m_loads [ jtWAL ].setTargetLatency (1000, 2500);
m_loads [ jtVALIDATION_t ].setTargetLatency (500, 1500);
m_loads [ jtWRITE ].setTargetLatency (1750, 2500);
m_loads [ jtTRANSACTION_l ].setTargetLatency (100, 500);
m_loads [ jtPROPOSAL_t ].setTargetLatency (100, 500);
mJobLoads [ jtCLIENT ].setTargetLatency (2000, 5000);
mJobLoads [ jtPEER ].setTargetLatency (200, 2500);
mJobLoads [ jtDISK ].setTargetLatency (500, 1000);
mJobLoads [ jtACCEPTLEDGER ].setTargetLatency (1000, 2500);
m_loads [ jtCLIENT ].setTargetLatency (2000, 5000);
m_loads [ jtPEER ].setTargetLatency (200, 2500);
m_loads [ jtDISK ].setTargetLatency (500, 1000);
m_loads [ jtACCEPTLEDGER ].setTargetLatency (1000, 2500);
mJobLoads [ jtNETOP_CLUSTER ].setTargetLatency (9999, 9999); // once per 10 seconds
mJobLoads [ jtNETOP_TIMER ].setTargetLatency (999, 999); // once per second
}
m_loads [ jtNETOP_CLUSTER ].setTargetLatency (9999, 9999); // once per 10 seconds
m_loads [ jtNETOP_TIMER ].setTargetLatency (999, 999); // once per second
}
JobQueue::~JobQueue ()
{
}
~JobQueueImp ()
{
}
void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& jobFunc)
{
void addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& jobFunc)
{
bassert (type != jtINVALID);
// FIXME: Workaround incorrect client shutdown ordering
// do not add jobs to a queue with no threads
bassert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0);
// If this goes off it means that a child didn't follow the Service API rules.
bassert (! isServiceStopped() && ! areServiceChildrenStopped());
// Don't even add it to the queue if we're stopping
// and the job type is marked for skipOnStop.
//
if (isServiceStopping() && skipOnStop (type))
{
m_journal.debug <<
"Skipping addJob ('" << name << "')";
return;
}
{
ScopedLock lock (m_mutex);
std::pair< std::set <Job>::iterator, bool > it =
m_state.jobSet.insert (Job (
type, name, ++m_state.lastJob, mJobLoads[type], jobFunc));
m_jobSet.insert (Job (
type, name, ++m_lastJob, m_loads[type], jobFunc, m_cancelCallback));
// start timing how long it stays in the queue
it.first->peekEvent().start();
queueJob (*it.first, lock);
}
}
}
int JobQueue::getJobCount (JobType t)
{
int getJobCount (JobType t)
{
ScopedLock lock (m_mutex);
JobCounts::const_iterator c = m_state.jobCounts.find (t);
MapType::const_iterator c = m_jobCounts.find (t);
return (c == m_state.jobCounts.end ()) ? 0 : c->second.waiting;
}
return (c == m_jobCounts.end ()) ? 0 : c->second.waiting;
}
int JobQueue::getJobCountTotal (JobType t)
{
int getJobCountTotal (JobType t)
{
ScopedLock lock (m_mutex);
JobCounts::const_iterator c = m_state.jobCounts.find (t);
MapType::const_iterator c = m_jobCounts.find (t);
return (c == m_state.jobCounts.end ()) ? 0 : (c->second.waiting + c->second.running);
}
return (c == m_jobCounts.end ()) ? 0 : (c->second.waiting + c->second.running);
}
int JobQueue::getJobCountGE (JobType t)
{
int getJobCountGE (JobType t)
{
// return the number of jobs at this priority level or greater
int ret = 0;
ScopedLock lock (m_mutex);
typedef JobCounts::value_type jt_int_pair;
typedef MapType::value_type jt_int_pair;
BOOST_FOREACH (jt_int_pair const& it, m_state.jobCounts)
BOOST_FOREACH (jt_int_pair const& it, m_jobCounts)
{
if (it.first >= t)
ret += it.second.waiting;
}
return ret;
}
}
std::vector< std::pair<JobType, std::pair<int, int> > > JobQueue::getJobCounts ()
{
std::vector< std::pair<JobType, std::pair<int, int> > > getJobCounts ()
{
// return all jobs at all priority levels
std::vector< std::pair<JobType, std::pair<int, int> > > ret;
ScopedLock lock (m_mutex);
ret.reserve (m_state.jobCounts.size ());
ret.reserve (m_jobCounts.size ());
typedef JobCounts::value_type jt_int_pair;
typedef MapType::value_type jt_int_pair;
BOOST_FOREACH (const jt_int_pair & it, m_state.jobCounts)
BOOST_FOREACH (const jt_int_pair & it, m_jobCounts)
{
ret.push_back (std::make_pair (it.second.type,
std::make_pair (it.second.waiting, it.second.running)));
}
return ret;
}
}
Json::Value JobQueue::getJson (int)
{
// shut down the job queue without completing pending jobs
//
void shutdown ()
{
m_journal.info << "Job queue shutting down";
m_workers.pauseAllThreadsAndWait ();
}
// set the number of thread serving the job queue to precisely this number
void setThreadCount (int c, bool const standaloneMode)
{
if (standaloneMode)
{
c = 1;
}
else if (c == 0)
{
c = SystemStats::getNumCpus ();
// VFALCO NOTE According to boost, hardware_concurrency cannot return
// negative numbers/
//
if (c < 0)
c = 2; // VFALCO NOTE Why 2?
if (c > 4) // I/O will bottleneck
c = 4;
c += 2;
m_journal.info << "Auto-tuning to " << c << " validation/transaction/proposal threads";
}
m_workers.setNumberOfThreads (c);
}
LoadEvent::pointer getLoadEvent (JobType t, const std::string& name)
{
return boost::make_shared<LoadEvent> (boost::ref (m_loads[t]), name, true);
}
LoadEvent::autoptr getLoadEventAP (JobType t, const std::string& name)
{
return LoadEvent::autoptr (new LoadEvent (m_loads[t], name, true));
}
bool isOverloaded ()
{
int count = 0;
for (int i = 0; i < NUM_JOB_TYPES; ++i)
if (m_loads[i].isOver ())
++count;
return count > 0;
}
Json::Value getJson (int)
{
Json::Value ret (Json::objectValue);
ret["threads"] = m_workers.getNumberOfThreads ();
@@ -173,11 +273,11 @@ Json::Value JobQueue::getJson (int)
int threadCount;
bool isOver;
mJobLoads [i].getCountAndLatency (count, latencyAvg, latencyPeak, isOver);
m_loads [i].getCountAndLatency (count, latencyAvg, latencyPeak, isOver);
JobCounts::const_iterator it = m_state.jobCounts.find (type);
MapType::const_iterator it = m_jobCounts.find (type);
if (it == m_state.jobCounts.end ())
if (it == m_jobCounts.end ())
{
jobCount = 0;
threadCount = 0;
@@ -219,79 +319,55 @@ Json::Value JobQueue::getJson (int)
ret["job_types"] = priorities;
return ret;
}
bool JobQueue::isOverloaded ()
{
int count = 0;
for (int i = 0; i < NUM_JOB_TYPES; ++i)
if (mJobLoads[i].isOver ())
++count;
return count > 0;
}
// shut down the job queue without completing pending jobs
//
void JobQueue::shutdown ()
{
WriteLog (lsINFO, JobQueue) << "Job queue shutting down";
m_workers.pauseAllThreadsAndWait ();
}
// set the number of thread serving the job queue to precisely this number
void JobQueue::setThreadCount (int c, bool const standaloneMode)
{
if (standaloneMode)
{
c = 1;
}
else if (c == 0)
{
c = SystemStats::getNumCpus ();
// VFALCO NOTE According to boost, hardware_concurrency cannot return
// negative numbers/
private:
//------------------------------------------------------------------------------
// Signals the service stopped if the stopped condition is met.
//
if (c < 0)
c = 2; // VFALCO NOTE Why 2?
if (c > 4) // I/O will bottleneck
c = 4;
c += 2;
WriteLog (lsINFO, JobQueue) << "Auto-tuning to " << c << " validation/transaction/proposal threads";
void checkStopped (ScopedLock const& lock)
{
// We are stopped when all of the following are true:
//
// 1. A stop notification was received
// 2. All Service children have stopped
// 3. There are no executing calls to processTask
// 4. There are no remaining Jobs in the job set
//
if (isServiceStopping() &&
areServiceChildrenStopped() &&
(m_processCount == 0) &&
m_jobSet.empty())
{
serviceStopped();
}
}
m_workers.setNumberOfThreads (c);
}
//------------------------------------------------------------------------------
//
// Signals an added Job for processing.
//
// Pre-conditions:
// The JobType must be valid.
// The Job must exist in mJobSet.
// The Job must not have previously been queued.
//
// Post-conditions:
// Count of waiting jobs of that type will be incremented.
// If JobQueue exists, and has at least one thread, Job will eventually run.
//
// Invariants:
// The calling thread owns the JobLock
//
void JobQueue::queueJob (Job const& job, ScopedLock const& lock)
{
//------------------------------------------------------------------------------
//
// Signals an added Job for processing.
//
// Pre-conditions:
// The JobType must be valid.
// The Job must exist in mJobSet.
// The Job must not have previously been queued.
//
// Post-conditions:
// Count of waiting jobs of that type will be incremented.
// If JobQueue exists, and has at least one thread, Job will eventually run.
//
// Invariants:
// The calling thread owns the JobLock
//
void queueJob (Job const& job, ScopedLock const& lock)
{
JobType const type (job.getType ());
bassert (type != jtINVALID);
bassert (m_state.jobSet.find (job) != m_state.jobSet.end ());
bassert (m_jobSet.find (job) != m_jobSet.end ());
Count& count (m_state.jobCounts [type]);
Count& count (m_jobCounts [type]);
if (count.waiting + count.running < getJobLimit (type))
{
@@ -304,36 +380,36 @@ void JobQueue::queueJob (Job const& job, ScopedLock const& lock)
++count.deferred;
}
++count.waiting;
}
}
//------------------------------------------------------------------------------
//
// Returns the next Job we should run now.
//
// RunnableJob:
// A Job in the JobSet whose slots count for its type is greater than zero.
//
// Pre-conditions:
// mJobSet must not be empty.
// mJobSet holds at least one RunnableJob
//
// Post-conditions:
// job is a valid Job object.
// job is removed from mJobQueue.
// Waiting job count of it's type is decremented
// Running job count of it's type is incremented
//
// Invariants:
// The calling thread owns the JobLock
//
void JobQueue::getNextJob (Job& job, ScopedLock const& lock)
{
bassert (! m_state.jobSet.empty ());
//------------------------------------------------------------------------------
//
// Returns the next Job we should run now.
//
// RunnableJob:
// A Job in the JobSet whose slots count for its type is greater than zero.
//
// Pre-conditions:
// mJobSet must not be empty.
// mJobSet holds at least one RunnableJob
//
// Post-conditions:
// job is a valid Job object.
// job is removed from mJobQueue.
// Waiting job count of it's type is decremented
// Running job count of it's type is incremented
//
// Invariants:
// The calling thread owns the JobLock
//
void getNextJob (Job& job, ScopedLock const& lock)
{
bassert (! m_jobSet.empty ());
JobSet::const_iterator iter;
for (iter = m_state.jobSet.begin (); iter != m_state.jobSet.end (); ++iter)
for (iter = m_jobSet.begin (); iter != m_jobSet.end (); ++iter)
{
Count& count (m_state.jobCounts [iter->getType ()]);
Count& count (m_jobCounts [iter->getType ()]);
bassert (count.running <= getJobLimit (count.type));
@@ -345,48 +421,43 @@ void JobQueue::getNextJob (Job& job, ScopedLock const& lock)
}
}
bassert (iter != m_state.jobSet.end ());
bassert (iter != m_jobSet.end ());
JobType const type = iter->getType ();
Count& count (m_state.jobCounts [type]);
Count& count (m_jobCounts [type]);
bassert (type != jtINVALID);
job = *iter;
m_state.jobSet.erase (iter);
m_jobSet.erase (iter);
--count.waiting;
++count.running;
}
}
//------------------------------------------------------------------------------
//
// Indicates that a running Job has completed its task.
//
// Pre-conditions:
// Job must not exist in mJobSet.
// The JobType must not be invalid.
//
// Post-conditions:
// The running count of that JobType is decremented
// A new task is signaled if there are more waiting Jobs than the limit, if any.
//
// Invariants:
// <none>
//
void JobQueue::finishJob (Job const& job)
{
//------------------------------------------------------------------------------
//
// Indicates that a running Job has completed its task.
//
// Pre-conditions:
// Job must not exist in mJobSet.
// The JobType must not be invalid.
//
// Post-conditions:
// The running count of that JobType is decremented
// A new task is signaled if there are more waiting Jobs than the limit, if any.
//
// Invariants:
// <none>
//
void finishJob (Job const& job, ScopedLock const& lock)
{
JobType const type = job.getType ();
{
ScopedLock lock (m_mutex);
bassert (m_state.jobSet.find (job) == m_state.jobSet.end ());
bassert (m_jobSet.find (job) == m_jobSet.end ());
bassert (type != jtINVALID);
Count& count (m_state.jobCounts [type]);
Count& count (m_jobCounts [type]);
// Queue a deferred task if possible
if (count.deferred > 0)
@@ -399,55 +470,107 @@ void JobQueue::finishJob (Job const& job)
--count.running;
}
}
//------------------------------------------------------------------------------
//
// Runs the next appropriate waiting Job.
//
// Pre-conditions:
// A RunnableJob must exist in the JobSet
//
// Post-conditions:
// The chosen RunnableJob will have Job::doJob() called.
//
// Invariants:
// <none>
//
void JobQueue::processTask ()
{
//------------------------------------------------------------------------------
//
// Runs the next appropriate waiting Job.
//
// Pre-conditions:
// A RunnableJob must exist in the JobSet
//
// Post-conditions:
// The chosen RunnableJob will have Job::doJob() called.
//
// Invariants:
// <none>
//
void processTask ()
{
Job job;
{
ScopedLock lock (m_mutex);
getNextJob (job, lock);
++m_processCount;
}
JobType const type (job.getType ());
String const name (Job::toString (type));
// Skip the job if we are stopping and the
// skipOnStop flag is set for the job type
//
if (!isServiceStopping() || !skipOnStop (type))
{
Thread::setCurrentThreadName (name);
WriteLog (lsTRACE, JobQueue) << "Doing " << name << " job";
m_journal.trace << "Doing " << name << " job";
job.doJob ();
}
else
{
m_journal.error << "Skipping processTask ('" << name << "')";
}
finishJob (job);
{
ScopedLock lock (m_mutex);
finishJob (job, lock);
--m_processCount;
checkStopped (lock);
}
// Note that when Job::~Job is called, the last reference
// to the associated LoadEvent object (in the Job) may be destroyed.
}
}
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
// Returns the limit of running jobs for the given job type.
// For jobs with no limit, we return the largest int. Hopefully that
// will be enough.
//
int JobQueue::getJobLimit (JobType type)
{
// Returns `true` if all jobs of this type should be skipped when
// the JobQueue receives a stop notification. If the job type isn't
// skipped, the Job will be called and the job must call Job::shouldCancel
// to determine if a long running or non-mandatory operation should be canceled.
static bool skipOnStop (JobType type)
{
switch (type)
{
// These are skipped when a stop notification is received
case jtPACK:
case jtPUBOLDLEDGER:
case jtVALIDATION_ut:
case jtPROOFWORK:
case jtTRANSACTION_l:
case jtPROPOSAL_ut:
case jtLEDGER_DATA:
case jtUPDATE_PF:
case jtCLIENT:
case jtTRANSACTION:
case jtUNL:
case jtADVANCE:
case jtPUBLEDGER:
case jtTXN_DATA:
case jtVALIDATION_t:
case jtPROPOSAL_t:
case jtSWEEP:
case jtNETOP_CLUSTER:
case jtNETOP_TIMER:
case jtADMIN:
return true;
default:
bassertfalse;
case jtWAL:
case jtWRITE:
break;
}
return false;
}
// Returns the limit of running jobs for the given job type.
// For jobs with no limit, we return the largest int. Hopefully that
// will be enough.
//
static int getJobLimit (JobType type)
{
int limit = std::numeric_limits <int>::max ();
switch (type)
@@ -506,4 +629,80 @@ int JobQueue::getJobLimit (JobType type)
};
return limit;
}
//--------------------------------------------------------------------------
void onServiceStop ()
{
// VFALCO NOTE I wanted to remove all the jobs that are skippable
// but then the Workers count of tasks to process
// goes wrong.
/*
{
ScopedLock lock (m_mutex);
// Remove all jobs whose type is skipOnStop
typedef boost::unordered_map <JobType, std::size_t> MapType;
MapType counts;
bool const report (m_journal.debug.active());
for (JobSet::const_iterator iter (m_jobSet.begin());
iter != m_jobSet.end();)
{
if (skipOnStop (iter->getType()))
{
if (report)
{
std::pair <MapType::iterator, bool> result (
counts.insert (std::make_pair (iter->getType(), 1)));
if (! result.second)
++(result.first->second);
}
iter = m_jobSet.erase (iter);
}
else
{
++iter;
}
}
if (report)
{
Journal::Stream s (m_journal.debug);
for (MapType::const_iterator iter (counts.begin());
iter != counts.end(); ++iter)
{
s << std::endl <<
"Removed " << iter->second <<
" skiponStop jobs of type " << Job::toString (iter->first);
}
}
}
*/
}
void onServiceChildrenStopped ()
{
ScopedLock lock (m_mutex);
checkStopped (lock);
}
};
//------------------------------------------------------------------------------
JobQueue::JobQueue (char const* name, Service& parent)
: Service (name, parent)
{
}
//------------------------------------------------------------------------------
JobQueue* JobQueue::New (Service& parent, Journal journal)
{
return new JobQueueImp (parent, journal);
}

View File

@@ -4,94 +4,53 @@
*/
//==============================================================================
#ifndef RIPPLE_JOBQUEUE_H_INCLUDED
#define RIPPLE_JOBQUEUE_H_INCLUDED
#ifndef RIPPLE_CORE_JOBQUEUE_H_INCLUDED
#define RIPPLE_CORE_JOBQUEUE_H_INCLUDED
class JobQueue : private Workers::Callback
class JobQueue : public Service
{
protected:
JobQueue (char const* name, Service& parent);
public:
// Statistics on a particular JobType
struct Count
{
Count () noexcept;
explicit Count (JobType type) noexcept;
static JobQueue* New (Service& parent, Journal journal);
JobType type; // The type of Job these counts reflect
int waiting; // The number waiting
int running; // How many are running
int deferred; // Number of jobs we didn't signal due to limits
};
typedef std::map <JobType, Count> JobCounts;
//--------------------------------------------------------------------------
JobQueue ();
~JobQueue ();
virtual ~JobQueue () { }
// VFALCO TODO make convenience functions that allow the caller to not
// have to call bind.
//
void addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& job);
virtual void addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& job) = 0;
int getJobCount (JobType t); // Jobs waiting at this priority
// Jobs waiting at this priority
virtual int getJobCount (JobType t) = 0;
int getJobCountTotal (JobType t); // Jobs waiting plus running at this priority
// Jobs waiting plus running at this priority
virtual int getJobCountTotal (JobType t) = 0;
int getJobCountGE (JobType t); // All waiting jobs at or greater than this priority
// All waiting jobs at or greater than this priority
virtual int getJobCountGE (JobType t) = 0;
std::vector< std::pair<JobType, std::pair<int, int> > > getJobCounts (); // jobs waiting, threads doing
// jobs waiting, threads doing
virtual std::vector< std::pair<JobType, std::pair<int, int> > > getJobCounts () = 0;
void shutdown ();
virtual void shutdown () = 0;
void setThreadCount (int c, bool const standaloneMode);
virtual void setThreadCount (int c, bool const standaloneMode) = 0;
// VFALCO TODO Rename these to newLoadEventMeasurement or something similar
// since they create the object.
//
LoadEvent::pointer getLoadEvent (JobType t, const std::string& name)
{
return boost::make_shared<LoadEvent> (boost::ref (mJobLoads[t]), name, true);
}
virtual LoadEvent::pointer getLoadEvent (JobType t, const std::string& name) = 0;
// VFALCO TODO Why do we need two versions, one which returns a shared
// pointer and the other which returns an autoptr?
//
LoadEvent::autoptr getLoadEventAP (JobType t, const std::string& name)
{
return LoadEvent::autoptr (new LoadEvent (mJobLoads[t], name, true));
}
virtual LoadEvent::autoptr getLoadEventAP (JobType t, const std::string& name) = 0;
bool isOverloaded ();
virtual bool isOverloaded () = 0;
Json::Value getJson (int c = 0);
private:
typedef std::set <Job> JobSet;
struct State
{
State ();
uint64 lastJob;
JobSet jobSet;
JobCounts jobCounts;
};
void queueJob (Job const& job, ScopedLock const& lock);
void getNextJob (Job& job, ScopedLock const& lock);
void finishJob (Job const& job);
void processTask ();
static int getJobLimit (JobType type);
private:
typedef CriticalSection::ScopedLockType ScopedLock;
CriticalSection m_mutex;
State m_state;
Workers m_workers;
LoadMonitor mJobLoads [NUM_JOB_TYPES];
virtual Json::Value getJson (int c = 0) = 0;
};
#endif

View File

@@ -193,6 +193,11 @@ public:
return m_batch.getWriteLoad ();
}
void stopAsync ()
{
m_batch.stopAsync();
}
//--------------------------------------------------------------------------
void writeBatch (NodeStore::Batch const& batch)
@@ -200,6 +205,11 @@ public:
storeBatch (batch);
}
void writeStopped ()
{
m_scheduler.scheduledTasksStopped ();
}
private:
size_t const m_keyBytes;
NodeStore::Scheduler& m_scheduler;

View File

@@ -134,6 +134,11 @@ public:
return 0;
}
void stopAsync ()
{
m_scheduler.scheduledTasksStopped ();
}
//--------------------------------------------------------------------------
private:

View File

@@ -193,6 +193,11 @@ public:
return m_batch.getWriteLoad ();
}
void stopAsync ()
{
m_batch.stopAsync ();
}
//--------------------------------------------------------------------------
void writeBatch (NodeStore::Batch const& batch)
@@ -200,6 +205,11 @@ public:
storeBatch (batch);
}
void writeStopped ()
{
m_scheduler.scheduledTasksStopped ();
}
private:
size_t const m_keyBytes;
NodeStore::Scheduler& m_scheduler;

View File

@@ -229,6 +229,11 @@ public:
return m_batch.getWriteLoad ();
}
void stopAsync ()
{
m_batch.stopAsync();
}
//--------------------------------------------------------------------------
void writeBatch (Batch const& batch)
@@ -236,6 +241,11 @@ public:
storeBatch (batch);
}
void writeStopped ()
{
m_scheduler.scheduledTasksStopped ();
}
private:
size_t const m_keyBytes;
NodeStore::Scheduler& m_scheduler;

View File

@@ -10,8 +10,10 @@ private:
typedef std::map <uint256 const, NodeObject::Ptr> Map;
public:
Backend (size_t keyBytes, StringPairArray const& keyValues)
Backend (size_t keyBytes, StringPairArray const& keyValues,
NodeStore::Scheduler& scheduler)
: m_keyBytes (keyBytes)
, m_scheduler (scheduler)
{
}
@@ -71,12 +73,18 @@ public:
return 0;
}
void stopAsync ()
{
m_scheduler.scheduledTasksStopped ();
}
//--------------------------------------------------------------------------
private:
size_t const m_keyBytes;
Map m_map;
NodeStore::Scheduler& m_scheduler;
};
//------------------------------------------------------------------------------
@@ -104,7 +112,7 @@ NodeStore::Backend* MemoryBackendFactory::createInstance (
StringPairArray const& keyValues,
NodeStore::Scheduler& scheduler)
{
return new MemoryBackendFactory::Backend (keyBytes, keyValues);
return new MemoryBackendFactory::Backend (keyBytes, keyValues, scheduler);
}
//------------------------------------------------------------------------------

View File

@@ -173,7 +173,7 @@ void NodeStore::BatchWriter::store (NodeObject::ref object)
{
mWritePending = true;
m_scheduler.scheduleTask (this);
m_scheduler.scheduleTask (*this);
}
}
@@ -184,6 +184,14 @@ int NodeStore::BatchWriter::getWriteLoad ()
return std::max (mWriteLoad, static_cast<int> (mWriteSet.size ()));
}
void NodeStore::BatchWriter::stopAsync ()
{
LockType::scoped_lock sl (mWriteMutex);
if (! mWritePending)
m_callback.writeStopped ();
}
void NodeStore::BatchWriter::performScheduledTask ()
{
writeBatch ();
@@ -212,6 +220,8 @@ void NodeStore::BatchWriter::writeBatch ()
mWritePending = false;
mWriteLoad = 0;
m_callback.writeStopped ();
// VFALCO NOTE Fix this function to not return from the middle
return;
}
@@ -241,10 +251,13 @@ class NodeStoreImp
, LeakChecked <NodeStoreImp>
{
public:
NodeStoreImp (Parameters const& backendParameters,
NodeStoreImp (char const* name,
Service& parent,
Parameters const& backendParameters,
Parameters const& fastBackendParameters,
Scheduler& scheduler)
: m_scheduler (scheduler)
: NodeStore (name, parent)
, m_scheduler (scheduler)
, m_backend (createBackend (backendParameters, scheduler))
, m_fastBackend ((fastBackendParameters.size () > 0)
? createBackend (fastBackendParameters, scheduler) : nullptr)
@@ -484,6 +497,19 @@ public:
sourceDatabase.visitAll (callback);
}
//------------------------------------------------------------------------------
//
// Service
void onServiceStop ()
{
// notify the Backend
m_backend->stopAsync();
if (m_fastBackend)
m_fastBackend->stopAsync();
}
//------------------------------------------------------------------------------
static void missing_backend ()
@@ -551,9 +577,13 @@ NodeStore::Scheduler& NodeStore::getSynchronousScheduler ()
// Simple scheduler that performs the task immediately
struct SynchronousScheduler : Scheduler
{
void scheduleTask (Task* task)
void scheduleTask (Task& task)
{
task.performScheduledTask ();
}
void scheduledTasksStopped ()
{
task->performScheduledTask ();
}
};
@@ -583,11 +613,24 @@ void NodeStore::addAvailableBackends ()
NodeStore::addBackendFactory (KeyvaDBBackendFactory::getInstance ());
}
NodeStore* NodeStore::New (Parameters const& backendParameters,
//------------------------------------------------------------------------------
NodeStore::NodeStore (char const* name, Service& parent)
: Service (name, parent)
{
}
//------------------------------------------------------------------------------
NodeStore* NodeStore::New (char const* name,
Service& parent,
Parameters const& backendParameters,
Parameters fastBackendParameters,
Scheduler& scheduler)
{
return new NodeStoreImp (backendParameters,
return new NodeStoreImp (name,
parent,
backendParameters,
fastBackendParameters,
scheduler);
}
@@ -1032,10 +1075,17 @@ static NodeStoreTimingTests nodeStoreTimingTests;
class NodeStoreTests : public NodeStoreUnitTest
{
public:
NodeStoreTests () : NodeStoreUnitTest ("NodeStore")
NodeStoreTests ()
: NodeStoreUnitTest ("NodeStore")
{
}
~NodeStoreTests ()
{
}
//--------------------------------------------------------------------------
void testImport (String destBackendType, String srcBackendType, int64 seedValue)
{
File const node_db (File::createTempFile ("node_db"));
@@ -1049,7 +1099,8 @@ public:
// Write to source db
{
ScopedPointer <NodeStore> src (NodeStore::New (srcParams));
ScopedService service ("test");
ScopedPointer <NodeStore> src (NodeStore::New ("test", service, srcParams));
storeBatch (*src, batch);
}
@@ -1057,8 +1108,10 @@ public:
NodeStore::Batch copy;
{
ScopedService service ("test");
// Re-open the db
ScopedPointer <NodeStore> src (NodeStore::New (srcParams));
ScopedPointer <NodeStore> src (NodeStore::New ("test", service, srcParams));
// Set up the destination database
File const dest_db (File::createTempFile ("dest_db"));
@@ -1066,7 +1119,7 @@ public:
destParams.set ("type", destBackendType);
destParams.set ("path", dest_db.getFullPathName ());
ScopedPointer <NodeStore> dest (NodeStore::New (destParams));
ScopedPointer <NodeStore> dest (NodeStore::New ("test", service, destParams));
beginTestCase (String ("import into '") + destBackendType + "' from '" + srcBackendType + "'");
@@ -1116,8 +1169,10 @@ public:
createPredictableBatch (batch, 0, numObjectsToTest, seedValue);
{
ScopedService service ("test");
// Open the database
ScopedPointer <NodeStore> db (NodeStore::New (nodeParams, tempParams));
ScopedPointer <NodeStore> db (NodeStore::New ("test", service, nodeParams, tempParams));
// Write the batch
storeBatch (*db, batch);
@@ -1141,8 +1196,10 @@ public:
if (testPersistence)
{
{
ScopedService service ("test");
// Re-open the database without the ephemeral DB
ScopedPointer <NodeStore> db (NodeStore::New (nodeParams));
ScopedPointer <NodeStore> db (NodeStore::New ("test", service, nodeParams));
// Read it back in
NodeStore::Batch copy;
@@ -1156,8 +1213,11 @@ public:
if (useEphemeralDatabase)
{
ScopedService service ("test");
// Verify the ephemeral db
ScopedPointer <NodeStore> db (NodeStore::New (tempParams, StringPairArray ()));
ScopedPointer <NodeStore> db (NodeStore::New ("test",
service, tempParams, StringPairArray ()));
// Read it back in
NodeStore::Batch copy;

View File

@@ -20,8 +20,11 @@
@see NodeObject
*/
class NodeStore
class NodeStore : public Service
{
protected:
NodeStore (char const* name, Service& parent);
public:
enum
{
@@ -114,7 +117,6 @@ public:
virtual ~Task () { }
/** Performs the task.
The call may take place on a foreign thread.
*/
virtual void performScheduledTask () = 0;
@@ -125,7 +127,10 @@ public:
Depending on the implementation, this could happen
immediately or get deferred.
*/
virtual void scheduleTask (Task* task) = 0;
virtual void scheduleTask (Task& task) = 0;
/** Notifies the scheduler that all tasks are complete. */
virtual void scheduledTasksStopped () = 0;
};
//--------------------------------------------------------------------------
@@ -149,6 +154,9 @@ public:
struct Callback
{
virtual void writeBatch (Batch const& batch) = 0;
// Called after stopAsync when there is no more pending write
virtual void writeStopped () = 0;
};
/** Create a batch writer. */
@@ -170,6 +178,9 @@ public:
/** Get an estimate of the amount of writing I/O pending. */
int getWriteLoad ();
/** Called to notify that the NodeStore wants to stop. */
void stopAsync ();
private:
void performScheduledTask ();
void writeBatch ();
@@ -280,6 +291,9 @@ public:
/** Estimate the number of write operations pending. */
virtual int getWriteLoad () = 0;
/** Called to notify the Backend that the NodeStore wants to stop. */
virtual void stopAsync () = 0;
};
//--------------------------------------------------------------------------
@@ -331,7 +345,9 @@ public:
@return The opened database.
*/
static NodeStore* New (Parameters const& backendParameters,
static NodeStore* New (char const* name,
Service& parent,
Parameters const& backendParameters,
Parameters fastBackendParameters = Parameters (),
Scheduler& scheduler = getSynchronousScheduler ());

View File

@@ -7,7 +7,8 @@
class NullBackendFactory::Backend : public NodeStore::Backend
{
public:
Backend ()
explicit Backend (NodeStore::Scheduler& scheduler)
: m_scheduler (scheduler)
{
}
@@ -41,6 +42,14 @@ public:
{
return 0;
}
void stopAsync ()
{
m_scheduler.scheduledTasksStopped ();
}
private:
NodeStore::Scheduler& m_scheduler;
};
//------------------------------------------------------------------------------
@@ -66,9 +75,9 @@ String NullBackendFactory::getName () const
NodeStore::Backend* NullBackendFactory::createInstance (
size_t,
StringPairArray const&,
NodeStore::Scheduler&)
NodeStore::Scheduler& scheduler)
{
return new NullBackendFactory::Backend;
return new NullBackendFactory::Backend (scheduler);
}
//------------------------------------------------------------------------------

View File

@@ -4,17 +4,10 @@
*/
//==============================================================================
#ifndef RIPPLE_CORE_RIPPLEHEADER
#define RIPPLE_CORE_RIPPLEHEADER
#ifndef RIPPLE_CORE_H_INCLUDED
#define RIPPLE_CORE_H_INCLUDED
// For Validators
//
// VFALCO NOTE It is unfortunate that we are exposing boost/asio.hpp
// needlessly. Its only required because of the buffers types.
// The HTTPClient interface doesn't need asio (although the
// implementation does. This is also reuqired for
// UniformResourceLocator.
//
// VFALCO TODO For UniformResourceLocator, remove asap
#include "beast/modules/beast_asio/beast_asio.h"
#include "../ripple_basics/ripple_basics.h"

View File

@@ -0,0 +1,41 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
AsyncService::AsyncService (char const* name, Service& parent)
: Service (name, parent)
{
}
AsyncService::~AsyncService ()
{
// If this goes off it means the
// AsyncService API contract was violated.
//
bassert (m_pendingIo.get() == 0);
}
void AsyncService::serviceCountIoPending ()
{
++m_pendingIo;
}
bool AsyncService::serviceCountIoComplete (boost::system::error_code const& ec)
{
// If this goes off, the count is unbalanced.
bassert (m_pendingIo.get() > 0);
--m_pendingIo;
if (! ec || ec == boost::asio::error::operation_aborted)
return true;
return false;
}
void AsyncService::onServiceIoComplete ()
{
//serviceStopped();
}

View File

@@ -0,0 +1,52 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
#ifndef RIPPLE_NET_ASYNCSERVICE_H_INCLUDED
#define RIPPLE_NET_ASYNCSERVICE_H_INCLUDED
/** Service subclass that helps with managing asynchronous stopping. */
class AsyncService : public Service
{
public:
/** Create the service with the specified name and parent. */
AsyncService (char const* name, Service& parent);
~AsyncService ();
/** Increments the count of pending I/O for the service.
This should be called every time an asynchronous initiating
function is called by the derived clas.
Thread safety:
Safe to call from any thread at any time.
*/
void serviceCountIoPending ();
/** Decrements the count of pending I/O for the service.
This should be called at the very beginning of every completion
handler function in the derived class.
Thread safety:
Safe to call from any thread at any time.
@param The error_code of the completed asynchronous opereation.
@return `true` if the handler should return immediately.
*/
bool serviceCountIoComplete (boost::system::error_code const& ec);
/** Called after a stop notification when all pending I/O is complete.
The default implementation calls serviceStopped.
@see serviceStopped
*/
virtual void onServiceIoComplete ();
private:
Atomic <int> m_pendingIo;
};
#endif

View File

@@ -40,27 +40,150 @@ static uint8_t SNTPQueryData[48] =
#define NTP_OFF_XMITTS_INT 10
#define NTP_OFF_XMITTS_FRAC 11
class SNTPClientImp
: public SNTPClient
, public Thread
, public LeakChecked <SNTPClientImp>
{
public:
class SNTPQuery
{
public:
bool mReceivedReply;
time_t mLocalTimeSent;
uint32 mQueryNonce;
SNTPClient::SNTPClient (boost::asio::io_service& service)
: mLock (this, "SNTPClient", __FILE__, __LINE__)
, mSocket (service)
, mTimer (service)
, mResolver (service)
SNTPQuery (time_t j = (time_t) - 1) : mReceivedReply (false), mLocalTimeSent (j)
{
;
}
};
//--------------------------------------------------------------------------
explicit SNTPClientImp (Service& parent)
: SNTPClient (parent)
, Thread ("SNTPClient")
, mLock (this, "SNTPClient", __FILE__, __LINE__)
, mSocket (m_io_service)
, mTimer (m_io_service)
, mResolver (m_io_service)
, mOffset (0)
, mLastOffsetUpdate ((time_t) - 1)
, mReceiveBuffer (256)
{
{
mSocket.open (boost::asio::ip::udp::v4 ());
mSocket.async_receive_from (boost::asio::buffer (mReceiveBuffer, 256), mReceiveEndpoint,
boost::bind (&SNTPClient::receivePacket, this, boost::asio::placeholders::error,
mSocket.async_receive_from (boost::asio::buffer (mReceiveBuffer, 256),
mReceiveEndpoint, boost::bind (
&SNTPClientImp::receivePacket, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
mTimer.expires_from_now (boost::posix_time::seconds (NTP_QUERY_FREQUENCY));
mTimer.async_wait (boost::bind (&SNTPClient::timerEntry, this, boost::asio::placeholders::error));
}
mTimer.async_wait (boost::bind (&SNTPClientImp::timerEntry, this, boost::asio::placeholders::error));
void SNTPClient::resolveComplete (const boost::system::error_code& error, boost::asio::ip::udp::resolver::iterator it)
{
startThread ();
}
~SNTPClientImp ()
{
stopThread ();
}
//--------------------------------------------------------------------------
void run ()
{
m_io_service.run ();
serviceStopped ();
}
void onServiceStop ()
{
// HACK!
m_io_service.stop ();
}
//--------------------------------------------------------------------------
void init (const std::vector<std::string>& servers)
{
std::vector<std::string>::const_iterator it = servers.begin ();
if (it == servers.end ())
{
WriteLog (lsINFO, SNTPClient) << "SNTP: no server specified";
return;
}
BOOST_FOREACH (const std::string & it, servers)
addServer (it);
queryAll ();
}
void addServer (const std::string& server)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
mServers.push_back (std::make_pair (server, (time_t) - 1));
}
void queryAll ()
{
while (doQuery ())
nothing ();
}
bool getOffset (int& offset)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
if ((mLastOffsetUpdate == (time_t) - 1) || ((mLastOffsetUpdate + NTP_TIMESTAMP_VALID) < time (NULL)))
return false;
offset = mOffset;
return true;
}
bool doQuery ()
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
std::vector< std::pair<std::string, time_t> >::iterator best = mServers.end ();
for (std::vector< std::pair<std::string, time_t> >::iterator it = mServers.begin (), end = best;
it != end; ++it)
if ((best == end) || (it->second == (time_t) - 1) || (it->second < best->second))
best = it;
if (best == mServers.end ())
{
WriteLog (lsTRACE, SNTPClient) << "SNTP: No server to query";
return false;
}
time_t now = time (NULL);
if ((best->second != (time_t) - 1) && ((best->second + NTP_MIN_QUERY) >= now))
{
WriteLog (lsTRACE, SNTPClient) << "SNTP: All servers recently queried";
return false;
}
best->second = now;
boost::asio::ip::udp::resolver::query query (boost::asio::ip::udp::v4 (), best->first, "ntp");
mResolver.async_resolve (query,
boost::bind (&SNTPClientImp::resolveComplete, this,
boost::asio::placeholders::error, boost::asio::placeholders::iterator));
#ifdef SNTP_DEBUG
WriteLog (lsTRACE, SNTPClient) << "SNTP: Resolve pending for " << best->first;
#endif
return true;
}
void resolveComplete (const boost::system::error_code& error, boost::asio::ip::udp::resolver::iterator it)
{
if (!error)
{
boost::asio::ip::udp::resolver::iterator sel = it;
@@ -89,20 +212,20 @@ void SNTPClient::resolveComplete (const boost::system::error_code& error, boost:
reinterpret_cast<uint32*> (SNTPQueryData)[NTP_OFF_XMITTS_INT] = static_cast<uint32> (time (NULL)) + NTP_UNIX_OFFSET;
reinterpret_cast<uint32*> (SNTPQueryData)[NTP_OFF_XMITTS_FRAC] = query.mQueryNonce;
mSocket.async_send_to (boost::asio::buffer (SNTPQueryData, 48), *sel,
boost::bind (&SNTPClient::sendComplete, this,
boost::bind (&SNTPClientImp::sendComplete, this,
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
}
}
}
void SNTPClient::receivePacket (const boost::system::error_code& error, std::size_t bytes_xferd)
{
void receivePacket (const boost::system::error_code& error, std::size_t bytes_xferd)
{
if (!error)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
#ifdef SNTP_DEBUG
#ifdef SNTP_DEBUG
WriteLog (lsTRACE, SNTPClient) << "SNTP: Packet from " << mReceiveEndpoint;
#endif
#endif
std::map<boost::asio::ip::udp::endpoint, SNTPQuery>::iterator query = mQueries.find (mReceiveEndpoint);
if (query == mQueries.end ())
@@ -126,17 +249,17 @@ void SNTPClient::receivePacket (const boost::system::error_code& error, std::siz
}
mSocket.async_receive_from (boost::asio::buffer (mReceiveBuffer, 256), mReceiveEndpoint,
boost::bind (&SNTPClient::receivePacket, this, boost::asio::placeholders::error,
boost::bind (&SNTPClientImp::receivePacket, this, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
void SNTPClient::sendComplete (const boost::system::error_code& error, std::size_t)
{
void sendComplete (const boost::system::error_code& error, std::size_t)
{
CondLog (error, lsWARNING, SNTPClient) << "SNTP: Send error";
}
}
void SNTPClient::processReply ()
{
void processReply ()
{
assert (mReceiveBuffer.size () >= 48);
uint32* recvBuffer = reinterpret_cast<uint32*> (&mReceiveBuffer.front ());
@@ -186,89 +309,50 @@ void SNTPClient::processReply ()
mOffset = 0;
CondLog (timev || mOffset, lsTRACE, SNTPClient) << "SNTP: Offset is " << timev << ", new system offset is " << mOffset;
}
}
void SNTPClient::timerEntry (const boost::system::error_code& error)
{
void timerEntry (const boost::system::error_code& error)
{
if (!error)
{
doQuery ();
mTimer.expires_from_now (boost::posix_time::seconds (NTP_QUERY_FREQUENCY));
mTimer.async_wait (boost::bind (&SNTPClient::timerEntry, this, boost::asio::placeholders::error));
mTimer.async_wait (boost::bind (&SNTPClientImp::timerEntry, this, boost::asio::placeholders::error));
}
}
void SNTPClient::addServer (const std::string& server)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
mServers.push_back (std::make_pair (server, (time_t) - 1));
}
void SNTPClient::init (const std::vector<std::string>& servers)
{
std::vector<std::string>::const_iterator it = servers.begin ();
if (it == servers.end ())
{
WriteLog (lsINFO, SNTPClient) << "SNTP: no server specified";
return;
}
BOOST_FOREACH (const std::string & it, servers)
addServer (it);
queryAll ();
}
private:
typedef RippleMutex LockType;
typedef LockType::ScopedLockType ScopedLockType;
LockType mLock;
void SNTPClient::queryAll ()
boost::asio::io_service m_io_service;
std::map <boost::asio::ip::udp::endpoint, SNTPQuery> mQueries;
boost::asio::ip::udp::socket mSocket;
boost::asio::deadline_timer mTimer;
boost::asio::ip::udp::resolver mResolver;
std::vector< std::pair<std::string, time_t> > mServers;
int mOffset;
time_t mLastOffsetUpdate;
std::list<int> mOffsetList;
std::vector<uint8_t> mReceiveBuffer;
boost::asio::ip::udp::endpoint mReceiveEndpoint;
};
//------------------------------------------------------------------------------
SNTPClient::SNTPClient (Service& parent)
: AsyncService ("SNTPClient", parent)
{
while (doQuery ())
nothing ();
}
bool SNTPClient::getOffset (int& offset)
//------------------------------------------------------------------------------
SNTPClient* SNTPClient::New (Service& parent)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
if ((mLastOffsetUpdate == (time_t) - 1) || ((mLastOffsetUpdate + NTP_TIMESTAMP_VALID) < time (NULL)))
return false;
offset = mOffset;
return true;
return new SNTPClientImp (parent);
}
bool SNTPClient::doQuery ()
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
std::vector< std::pair<std::string, time_t> >::iterator best = mServers.end ();
for (std::vector< std::pair<std::string, time_t> >::iterator it = mServers.begin (), end = best;
it != end; ++it)
if ((best == end) || (it->second == (time_t) - 1) || (it->second < best->second))
best = it;
if (best == mServers.end ())
{
WriteLog (lsTRACE, SNTPClient) << "SNTP: No server to query";
return false;
}
time_t now = time (NULL);
if ((best->second != (time_t) - 1) && ((best->second + NTP_MIN_QUERY) >= now))
{
WriteLog (lsTRACE, SNTPClient) << "SNTP: All servers recently queried";
return false;
}
best->second = now;
boost::asio::ip::udp::resolver::query query (boost::asio::ip::udp::v4 (), best->first, "ntp");
mResolver.async_resolve (query,
boost::bind (&SNTPClient::resolveComplete, this,
boost::asio::placeholders::error, boost::asio::placeholders::iterator));
#ifdef SNTP_DEBUG
WriteLog (lsTRACE, SNTPClient) << "SNTP: Resolve pending for " << best->first;
#endif
return true;
}
// vim:ts=4

View File

@@ -7,63 +7,18 @@
#ifndef RIPPLE_NET_BASICS_SNTPCLIENT_H_INCLUDED
#define RIPPLE_NET_BASICS_SNTPCLIENT_H_INCLUDED
class SNTPQuery
class SNTPClient : public AsyncService
{
protected:
explicit SNTPClient (Service& parent);
public:
bool mReceivedReply;
time_t mLocalTimeSent;
uint32 mQueryNonce;
SNTPQuery (time_t j = (time_t) - 1) : mReceivedReply (false), mLocalTimeSent (j)
{
;
}
};
//------------------------------------------------------------------------------
// VFALCO TODO Make an abstract interface for this to hide the boost
//
class SNTPClient : LeakChecked <SNTPClient>
{
public:
explicit SNTPClient (boost::asio::io_service& service);
void init (std::vector <std::string> const& servers);
void addServer (std::string const& mServer);
void queryAll ();
bool doQuery ();
bool getOffset (int& offset);
private:
void receivePacket (const boost::system::error_code& error, std::size_t bytes);
void resolveComplete (const boost::system::error_code& error, boost::asio::ip::udp::resolver::iterator iterator);
void sentPacket (boost::shared_ptr<std::string>, const boost::system::error_code&, std::size_t);
void timerEntry (const boost::system::error_code&);
void sendComplete (const boost::system::error_code& error, std::size_t bytesTransferred);
void processReply ();
private:
typedef RippleMutex LockType;
typedef LockType::ScopedLockType ScopedLockType;
LockType mLock;
std::map <boost::asio::ip::udp::endpoint, SNTPQuery> mQueries;
boost::asio::ip::udp::socket mSocket;
boost::asio::deadline_timer mTimer;
boost::asio::ip::udp::resolver mResolver;
std::vector< std::pair<std::string, time_t> > mServers;
int mOffset;
time_t mLastOffsetUpdate;
std::list<int> mOffsetList;
std::vector<uint8_t> mReceiveBuffer;
boost::asio::ip::udp::endpoint mReceiveEndpoint;
static SNTPClient* New (Service& parent);
virtual ~SNTPClient() { }
virtual void init (std::vector <std::string> const& servers) = 0;
virtual void addServer (std::string const& mServer) = 0;
virtual void queryAll () = 0;
virtual bool getOffset (int& offset) = 0;
};
#endif

View File

@@ -32,11 +32,15 @@
namespace ripple
{
#include "basics/impl/MultiSocketType.h"
#include "basics/RippleSSLContext.cpp"
#include "basics/MultiSocket.cpp"
#include "basics/AsyncService.cpp"
#include "basics/HTTPRequest.cpp"
#include "basics/HTTPClient.cpp"
# include "basics/impl/MultiSocketType.h"
#include "basics/MultiSocket.cpp"
#include "basics/RippleSSLContext.cpp"
# include "basics/impl/RPCServerImp.h"
#include "basics/RPCDoor.cpp"
#include "basics/SNTPClient.cpp"

View File

@@ -19,6 +19,7 @@
namespace ripple
{
#include "basics/AsyncService.h"
#include "basics/RippleSSLContext.h"
#include "basics/MultiSocket.h"
#include "basics/HTTPRequest.h"

View File

@@ -15,7 +15,14 @@
// code assumes this node is synched (and will continue to do so until
// there's a functional network.
// VFALCO TODO Figure out how to clean up these globals
//------------------------------------------------------------------------------
InfoSub::Source::Source (char const* name, Service& parent)
: Service (name, parent)
{
}
//------------------------------------------------------------------------------
InfoSub::InfoSub (Source& source)
: mLock (this, "InfoSub", __FILE__, __LINE__)

View File

@@ -30,8 +30,11 @@ public:
public:
/** Abstracts the source of subscription data.
*/
class Source
class Source : public Service
{
protected:
Source (char const* name, Service& parent);
public:
// VFALCO TODO Rename the 'rt' parameters to something meaningful.
virtual void subAccount (ref ispListener,