Add CollectorManager for Beast.Insight support

This commit is contained in:
Vinnie Falco
2013-12-14 20:20:33 -08:00
parent f88fcf55a3
commit 81d418007a
15 changed files with 223 additions and 7 deletions

View File

@@ -43,6 +43,7 @@
#include "../beast/beast/chrono/Chrono.cpp"
#include "../beast/beast/crypto/Crypto.cpp"
#include "../beast/beast/http/HTTP.cpp"
#include "../beast/beast/insight/Insight.cpp"
#include "../beast/beast/net/Net.cpp"
#include "../beast/beast/smart_ptr/SmartPtr.cpp"
#include "../beast/beast/strings/Strings.cpp"

View File

@@ -46,6 +46,8 @@ template <> char const* LogPartition::getPartitionName <LoadManagerLog> () { ret
class ResourceManagerLog;
template <> char const* LogPartition::getPartitionName <ResourceManagerLog> () { return "ResourceManager"; }
template <> char const* LogPartition::getPartitionName <CollectorManager> () { return "Collector"; }
//
//------------------------------------------------------------------------------
@@ -74,6 +76,10 @@ public:
, m_tempNodeCache ("NodeCache", 16384, 90)
, m_sleCache ("LedgerEntryCache", 4096, 120)
, m_collectorManager (CollectorManager::New (
getConfig().insightSettings,
LogPartition::getJournal <CollectorManager> ()))
, m_resourceManager (add (Resource::Manager::New (
LogPartition::getJournal <ResourceManagerLog> ())))
@@ -83,7 +89,9 @@ public:
// The JobQueue has to come pretty early since
// almost everything is a Stoppable child of the JobQueue.
//
, m_jobQueue (JobQueue::New (*this, LogPartition::getJournal <JobQueueLog> ()))
, m_jobQueue (JobQueue::New (
m_collectorManager->collector (),
*this, LogPartition::getJournal <JobQueueLog> ()))
// The io_service must be a child of the JobQueue since we call addJob
// in response to newtwork data from peers and also client requests.
@@ -165,6 +173,11 @@ public:
//--------------------------------------------------------------------------
CollectorManager& getCollectorManager ()
{
return *m_collectorManager;
}
RPC::Manager& getRPCServiceManager()
{
return *m_rpcServiceManager;
@@ -882,6 +895,7 @@ private:
LocalCredentials m_localCredentials;
TransactionMaster m_txMaster;
beast::unique_ptr <CollectorManager> m_collectorManager;
ScopedPointer <Resource::Manager> m_resourceManager;
ScopedPointer <RPC::Manager> m_rpcServiceManager;

View File

@@ -27,6 +27,7 @@ namespace NodeStore { class Database; }
namespace RPC { class Manager; }
// VFALCO TODO Fix forward declares required for header dependency loops
class CollectorManager;
class IFeatures;
class IFeeVote;
class IHashRouter;
@@ -77,9 +78,8 @@ public:
virtual ~Application () { }
virtual boost::asio::io_service& getIOService () = 0;
virtual CollectorManager& getCollectorManager () = 0;
virtual RPC::Manager& getRPCServiceManager() = 0;
virtual JobQueue& getJobQueue () = 0;
virtual SiteFiles::Manager& getSiteFiles () = 0;
virtual NodeCache& getTempNodeCache () = 0;

View File

@@ -0,0 +1,71 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
namespace ripple {
class CollectorManagerImp
: public CollectorManager
{
public:
Journal m_journal;
shared_ptr <insight::Collector> m_collector;
CollectorManagerImp (StringPairArray const& params,
Journal journal)
: m_journal (journal)
{
std::string const& server (params ["server"].toStdString());
if (server == "statsd")
{
IPAddress const address (IPAddress::from_string (
params ["address"].toStdString ()));
std::string const& prefix (params ["prefix"].toStdString ());
m_collector = insight::StatsDCollector::New (address, prefix, journal);
}
else
{
m_collector = insight::NullCollector::New ();
}
}
~CollectorManagerImp ()
{
}
shared_ptr <insight::Collector> const& collector ()
{
return m_collector;
}
};
//------------------------------------------------------------------------------
CollectorManager::~CollectorManager ()
{
}
CollectorManager* CollectorManager::New (StringPairArray const& params,
Journal journal)
{
return new CollectorManagerImp (params, journal);
}
}

View File

@@ -0,0 +1,39 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_RIPPLECOLLECTOR_H_INCLUDED
#define RIPPLE_RIPPLECOLLECTOR_H_INCLUDED
#include "beast/beast/Insight.h"
namespace ripple {
/** Provides the beast::insight::Collector service. */
class CollectorManager
{
public:
static CollectorManager* New (StringPairArray const& params,
Journal journal);
virtual ~CollectorManager () = 0;
virtual shared_ptr <insight::Collector> const& collector () = 0;
};
}
#endif

View File

@@ -42,6 +42,9 @@
#include "beast/beast/Asio.h"
# include "main/CollectorManager.h"
#include "main/CollectorManager.cpp"
namespace ripple {
//

View File

@@ -331,6 +331,8 @@ void Config::load ()
(void) SectionSingleB (secConfig, SECTION_RPC_PASSWORD, RPC_PASSWORD);
(void) SectionSingleB (secConfig, SECTION_RPC_USER, RPC_USER);
insightSettings = parseKeyValueSection (secConfig, SECTION_INSIGHT);
//---------------------------------------
//
// VFALCO BEGIN CLEAN

View File

@@ -298,6 +298,9 @@ public:
//--------------------------------------------------------------------------
/** Parameters for the insight collection module */
StringPairArray insightSettings;
/** Parameters for the main NodeStore database.
This is 1 or more strings of the form <key>=<value>

View File

@@ -47,6 +47,7 @@ struct ConfigSection
#define SECTION_FEE_ACCOUNT_RESERVE "fee_account_reserve"
#define SECTION_FEE_OWNER_RESERVE "fee_owner_reserve"
#define SECTION_LEDGER_HISTORY "ledger_history"
#define SECTION_INSIGHT "insight"
#define SECTION_IPS "ips"
#define SECTION_IPS_FIXED "ips_fixed"
#define SECTION_NETWORK_QUORUM "network_quorum"

View File

@@ -22,6 +22,12 @@ class JobQueueImp
, private Workers::Callback
{
public:
struct Metrics
{
insight::Hook hook;
insight::Gauge job_count;
};
// Statistics for each JobType
//
struct Count
@@ -53,6 +59,7 @@ public:
typedef CriticalSection::ScopedLockType ScopedLock;
Journal m_journal;
Metrics m_metrics;
CriticalSection m_mutex;
uint64 m_lastJob;
JobSet m_jobSet;
@@ -67,7 +74,8 @@ public:
//--------------------------------------------------------------------------
JobQueueImp (Stoppable& parent, Journal journal)
JobQueueImp (shared_ptr <insight::Collector> const& collector,
Stoppable& parent, Journal journal)
: JobQueue ("JobQueue", parent)
, m_journal (journal)
, m_lastJob (0)
@@ -75,6 +83,10 @@ public:
, m_workers (*this, "JobQueue", 0)
, m_cancelCallback (boost::bind (&Stoppable::isStopping, this))
{
m_metrics.hook = collector->make_hook (beast::bind (
&JobQueueImp::collect, this));
m_metrics.job_count = collector->make_gauge ("job_count");
{
ScopedLock lock (m_mutex);
@@ -112,6 +124,12 @@ public:
{
}
void collect ()
{
ScopedLock lock (m_mutex);
m_metrics.job_count = m_jobSet.size ();
}
void addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& jobFunc)
{
bassert (type != jtINVALID);
@@ -708,7 +726,8 @@ JobQueue::JobQueue (char const* name, Stoppable& parent)
//------------------------------------------------------------------------------
JobQueue* JobQueue::New (Stoppable& parent, Journal journal)
JobQueue* JobQueue::New (shared_ptr <insight::Collector> const& collector,
Stoppable& parent, Journal journal)
{
return new JobQueueImp (parent, journal);
return new JobQueueImp (collector, parent, journal);
}

View File

@@ -26,7 +26,8 @@ protected:
JobQueue (char const* name, Stoppable& parent);
public:
static JobQueue* New (Stoppable& parent, Journal journal);
static JobQueue* New (shared_ptr <insight::Collector> const& collector,
Stoppable& parent, Journal journal);
virtual ~JobQueue () { }

View File

@@ -24,6 +24,7 @@
#include "../ripple_data/ripple_data.h"
#include "beast/beast/http/URL.h" // for Config
#include "beast/beast/Insight.h"
#include "../ripple/resource/api/LegacyFees.h"