Clean up and restructure sources

This commit is contained in:
Vinnie Falco
2014-09-16 14:03:58 -07:00
parent 1dcd06a1c1
commit 4239880acb
522 changed files with 3264 additions and 3400 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,132 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/core/Job.h>
namespace ripple {
Job::Job ()
: mType (jtINVALID)
, mJobIndex (0)
{
}
Job::Job (JobType type, std::uint64_t index)
: mType (type)
, mJobIndex (index)
{
}
Job::Job (JobType type,
std::string const& name,
std::uint64_t index,
LoadMonitor& lm,
std::function <void (Job&)> const& job,
CancelCallback cancelCallback)
: m_cancelCallback (cancelCallback)
, mType (type)
, mJobIndex (index)
, mJob (job)
, mName (name)
, m_queue_time (clock_type::now ())
{
m_loadEvent = std::make_shared <LoadEvent> (std::ref (lm), name, false);
}
JobType Job::getType () const
{
return mType;
}
Job::CancelCallback Job::getCancelCallback () const
{
bassert (m_cancelCallback);
return m_cancelCallback;
}
Job::clock_type::time_point const& Job::queue_time () const
{
return m_queue_time;
}
bool Job::shouldCancel () const
{
if (m_cancelCallback)
return m_cancelCallback ();
return false;
}
void Job::doJob ()
{
m_loadEvent->start ();
m_loadEvent->reName (mName);
mJob (*this);
}
void Job::rename (std::string const& newName)
{
mName = newName;
}
bool Job::operator> (const Job& j) const
{
if (mType < j.mType)
return true;
if (mType > j.mType)
return false;
return mJobIndex > j.mJobIndex;
}
bool Job::operator>= (const Job& j) const
{
if (mType < j.mType)
return true;
if (mType > j.mType)
return false;
return mJobIndex >= j.mJobIndex;
}
bool Job::operator< (const Job& j) const
{
if (mType < j.mType)
return false;
if (mType > j.mType)
return true;
return mJobIndex < j.mJobIndex;
}
bool Job::operator<= (const Job& j) const
{
if (mType < j.mType)
return false;
if (mType > j.mType)
return true;
return mJobIndex <= j.mJobIndex;
}
}

View File

@@ -0,0 +1,693 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/core/JobQueue.h>
#include <ripple/core/JobTypes.h>
#include <ripple/core/JobTypeInfo.h>
#include <ripple/core/JobTypeData.h>
#include <beast/cxx14/memory.h>
#include <beast/chrono/chrono_util.h>
#include <beast/module/core/thread/Workers.h>
#include <beast/module/core/system/SystemStats.h>
#include <chrono>
#include <mutex>
#include <set>
namespace ripple {
class JobQueueImp
: public JobQueue
, private beast::Workers::Callback
{
public:
typedef std::set <Job> JobSet;
typedef std::map <JobType, JobTypeData> JobDataMap;
typedef std::lock_guard <std::mutex> ScopedLock;
beast::Journal m_journal;
std::mutex m_mutex;
std::uint64_t m_lastJob;
JobSet m_jobSet;
JobDataMap m_jobData;
JobTypeData m_invalidJobData;
// The number of jobs currently in processTask()
int m_processCount;
beast::Workers m_workers;
Job::CancelCallback m_cancelCallback;
// statistics tracking
beast::insight::Collector::ptr m_collector;
beast::insight::Gauge job_count;
beast::insight::Hook hook;
//--------------------------------------------------------------------------
static JobTypes const& getJobTypes ()
{
static JobTypes types;
return types;
}
//--------------------------------------------------------------------------
JobQueueImp (beast::insight::Collector::ptr const& collector,
Stoppable& parent, beast::Journal journal)
: JobQueue ("JobQueue", parent)
, m_journal (journal)
, m_lastJob (0)
, m_invalidJobData (getJobTypes ().getInvalid (), collector)
, m_processCount (0)
, m_workers (*this, "JobQueue", 0)
, m_cancelCallback (std::bind (&Stoppable::isStopping, this))
, m_collector (collector)
{
hook = m_collector->make_hook (std::bind (
&JobQueueImp::collect, this));
job_count = m_collector->make_gauge ("job_count");
{
ScopedLock lock (m_mutex);
for (auto const& x : getJobTypes ())
{
JobTypeInfo const& jt = x.second;
// And create dynamic information for all jobs
auto const result (m_jobData.emplace (std::piecewise_construct,
std::forward_as_tuple (jt.type ()),
std::forward_as_tuple (jt, m_collector)));
assert (result.second == true);
}
}
}
~JobQueueImp ()
{
// Must unhook before destroying
hook = beast::insight::Hook ();
}
void collect ()
{
ScopedLock lock (m_mutex);
job_count = m_jobSet.size ();
}
void addJob (JobType type, std::string const& name,
boost::function <void (Job&)> const& jobFunc)
{
assert (type != jtINVALID);
JobDataMap::iterator iter (m_jobData.find (type));
assert (iter != m_jobData.end ());
if (iter == m_jobData.end ())
return;
JobTypeData& data (iter->second);
// FIXME: Workaround incorrect client shutdown ordering
// do not add jobs to a queue with no threads
assert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0);
{
// If this goes off it means that a child didn't follow
// the Stoppable API rules. A job may only be added if:
//
// - The JobQueue has NOT stopped
// AND
// * We are currently processing jobs
// OR
// * We have have pending jobs
// OR
// * Not all children are stopped
//
ScopedLock lock (m_mutex);
assert (! isStopped() && (
m_processCount>0 ||
! m_jobSet.empty () ||
! areChildrenStopped()));
}
// Don't even add it to the queue if we're stopping
// and the job type is marked for skipOnStop.
//
if (isStopping() && skipOnStop (type))
{
m_journal.debug <<
"Skipping addJob ('" << name << "')";
return;
}
{
ScopedLock lock (m_mutex);
std::pair <std::set <Job>::iterator, bool> result (
m_jobSet.insert (Job (type, name, ++m_lastJob,
data.load (), jobFunc, m_cancelCallback)));
queueJob (*result.first, lock);
}
}
int getJobCount (JobType t)
{
ScopedLock lock (m_mutex);
JobDataMap::const_iterator c = m_jobData.find (t);
return (c == m_jobData.end ())
? 0
: c->second.waiting;
}
int getJobCountTotal (JobType t)
{
ScopedLock lock (m_mutex);
JobDataMap::const_iterator c = m_jobData.find (t);
return (c == m_jobData.end ())
? 0
: (c->second.waiting + c->second.running);
}
int getJobCountGE (JobType t)
{
// return the number of jobs at this priority level or greater
int ret = 0;
ScopedLock lock (m_mutex);
for (auto const& x : m_jobData)
{
if (x.first >= t)
ret += x.second.waiting;
}
return ret;
}
// shut down the job queue without completing pending jobs
//
void shutdown ()
{
m_journal.info << "Job queue shutting down";
m_workers.pauseAllThreadsAndWait ();
}
// set the number of thread serving the job queue to precisely this number
void setThreadCount (int c, bool const standaloneMode)
{
if (standaloneMode)
{
c = 1;
}
else if (c == 0)
{
c = beast::SystemStats::getNumCpus ();
// VFALCO NOTE According to boost, hardware_concurrency cannot return
// negative numbers/
//
if (c < 0)
c = 2; // VFALCO NOTE Why 2?
if (c > 4) // I/O will bottleneck
c = 4;
c += 2;
m_journal.info << "Auto-tuning to " << c <<
" validation/transaction/proposal threads";
}
m_workers.setNumberOfThreads (c);
}
LoadEvent::pointer getLoadEvent (JobType t, std::string const& name)
{
JobDataMap::iterator iter (m_jobData.find (t));
assert (iter != m_jobData.end ());
if (iter == m_jobData.end ())
return std::shared_ptr<LoadEvent> ();
return std::make_shared<LoadEvent> (
std::ref (iter-> second.load ()), name, true);
}
LoadEvent::autoptr getLoadEventAP (JobType t, std::string const& name)
{
JobDataMap::iterator iter (m_jobData.find (t));
assert (iter != m_jobData.end ());
if (iter == m_jobData.end ())
return LoadEvent::autoptr ();
return LoadEvent::autoptr (
new LoadEvent (iter-> second.load (), name, true));
}
void addLoadEvents (JobType t,
int count, std::chrono::milliseconds elapsed)
{
JobDataMap::iterator iter (m_jobData.find (t));
assert (iter != m_jobData.end ());
iter->second.load().addSamples (count, elapsed);
}
bool isOverloaded ()
{
int count = 0;
for (auto& x : m_jobData)
{
if (x.second.load ().isOver ())
++count;
}
return count > 0;
}
Json::Value getJson (int)
{
Json::Value ret (Json::objectValue);
ret["threads"] = m_workers.getNumberOfThreads ();
Json::Value priorities = Json::arrayValue;
ScopedLock lock (m_mutex);
for (auto& x : m_jobData)
{
assert (x.first != jtINVALID);
if (x.first == jtGENERIC)
continue;
JobTypeData& data (x.second);
LoadMonitor::Stats stats (data.stats ());
int waiting (data.waiting);
int running (data.running);
if ((stats.count != 0) || (waiting != 0) ||
(stats.latencyPeak != 0) || (running != 0))
{
Json::Value& pri = priorities.append (Json::objectValue);
pri["job_type"] = data.name ();
if (stats.isOverloaded)
pri["over_target"] = true;
if (waiting != 0)
pri["waiting"] = waiting;
if (stats.count != 0)
pri["per_second"] = static_cast<int> (stats.count);
if (stats.latencyPeak != 0)
pri["peak_time"] = static_cast<int> (stats.latencyPeak);
if (stats.latencyAvg != 0)
pri["avg_time"] = static_cast<int> (stats.latencyAvg);
if (running != 0)
pri["in_progress"] = running;
}
}
ret["job_types"] = priorities;
return ret;
}
private:
//--------------------------------------------------------------------------
JobTypeData& getJobTypeData (JobType type)
{
JobDataMap::iterator c (m_jobData.find (type));
assert (c != m_jobData.end ());
// NIKB: This is ugly and I hate it. We must remove jtINVALID completely
// and use something sane.
if (c == m_jobData.end ())
return m_invalidJobData;
return c->second;
}
//--------------------------------------------------------------------------
// Signals the service stopped if the stopped condition is met.
//
void checkStopped (ScopedLock const& lock)
{
// We are stopped when all of the following are true:
//
// 1. A stop notification was received
// 2. All Stoppable children have stopped
// 3. There are no executing calls to processTask
// 4. There are no remaining Jobs in the job set
//
if (isStopping() &&
areChildrenStopped() &&
(m_processCount == 0) &&
m_jobSet.empty())
{
stopped();
}
}
//--------------------------------------------------------------------------
//
// Signals an added Job for processing.
//
// Pre-conditions:
// The JobType must be valid.
// The Job must exist in mJobSet.
// The Job must not have previously been queued.
//
// Post-conditions:
// Count of waiting jobs of that type will be incremented.
// If JobQueue exists, and has at least one thread, Job will eventually run.
//
// Invariants:
// The calling thread owns the JobLock
//
void queueJob (Job const& job, ScopedLock const& lock)
{
JobType const type (job.getType ());
assert (type != jtINVALID);
assert (m_jobSet.find (job) != m_jobSet.end ());
JobTypeData& data (getJobTypeData (type));
if (data.waiting + data.running < getJobLimit (type))
{
m_workers.addTask ();
}
else
{
// defer the task until we go below the limit
//
++data.deferred;
}
++data.waiting;
}
//------------------------------------------------------------------------------
//
// Returns the next Job we should run now.
//
// RunnableJob:
// A Job in the JobSet whose slots count for its type is greater than zero.
//
// Pre-conditions:
// mJobSet must not be empty.
// mJobSet holds at least one RunnableJob
//
// Post-conditions:
// job is a valid Job object.
// job is removed from mJobQueue.
// Waiting job count of it's type is decremented
// Running job count of it's type is incremented
//
// Invariants:
// The calling thread owns the JobLock
//
void getNextJob (Job& job, ScopedLock const& lock)
{
assert (! m_jobSet.empty ());
JobSet::const_iterator iter;
for (iter = m_jobSet.begin (); iter != m_jobSet.end (); ++iter)
{
JobTypeData& data (getJobTypeData (iter->getType ()));
assert (data.running <= getJobLimit (data.type ()));
// Run this job if we're running below the limit.
if (data.running < getJobLimit (data.type ()))
{
assert (data.waiting > 0);
break;
}
}
assert (iter != m_jobSet.end ());
JobType const type = iter->getType ();
JobTypeData& data (getJobTypeData (type));
assert (type != jtINVALID);
job = *iter;
m_jobSet.erase (iter);
--data.waiting;
++data.running;
}
//------------------------------------------------------------------------------
//
// Indicates that a running Job has completed its task.
//
// Pre-conditions:
// Job must not exist in mJobSet.
// The JobType must not be invalid.
//
// Post-conditions:
// The running count of that JobType is decremented
// A new task is signaled if there are more waiting Jobs than the limit, if any.
//
// Invariants:
// <none>
//
void finishJob (Job const& job, ScopedLock const& lock)
{
JobType const type = job.getType ();
assert (m_jobSet.find (job) == m_jobSet.end ());
assert (type != jtINVALID);
JobTypeData& data (getJobTypeData (type));
// Queue a deferred task if possible
if (data.deferred > 0)
{
assert (data.running + data.waiting >= getJobLimit (type));
--data.deferred;
m_workers.addTask ();
}
--data.running;
}
//--------------------------------------------------------------------------
template <class Rep, class Period>
void on_dequeue (JobType type,
std::chrono::duration <Rep, Period> const& value)
{
auto const ms (ceil <std::chrono::milliseconds> (value));
if (ms.count() >= 10)
getJobTypeData (type).dequeue.notify (ms);
}
template <class Rep, class Period>
void on_execute (JobType type,
std::chrono::duration <Rep, Period> const& value)
{
auto const ms (ceil <std::chrono::milliseconds> (value));
if (ms.count() >= 10)
getJobTypeData (type).execute.notify (ms);
}
//--------------------------------------------------------------------------
//
// Runs the next appropriate waiting Job.
//
// Pre-conditions:
// A RunnableJob must exist in the JobSet
//
// Post-conditions:
// The chosen RunnableJob will have Job::doJob() called.
//
// Invariants:
// <none>
//
void processTask ()
{
Job job;
{
ScopedLock lock (m_mutex);
getNextJob (job, lock);
++m_processCount;
}
JobTypeData& data (getJobTypeData (job.getType ()));
// Skip the job if we are stopping and the
// skipOnStop flag is set for the job type
//
if (!isStopping() || !data.info.skip ())
{
beast::Thread::setCurrentThreadName (data.name ());
m_journal.trace << "Doing " << data.name () << " job";
Job::clock_type::time_point const start_time (
Job::clock_type::now());
on_dequeue (job.getType (), start_time - job.queue_time ());
job.doJob ();
on_execute (job.getType (), Job::clock_type::now() - start_time);
}
else
{
m_journal.trace << "Skipping processTask ('" << data.name () << "')";
}
{
ScopedLock lock (m_mutex);
finishJob (job, lock);
--m_processCount;
checkStopped (lock);
}
// Note that when Job::~Job is called, the last reference
// to the associated LoadEvent object (in the Job) may be destroyed.
}
//------------------------------------------------------------------------------
// Returns `true` if all jobs of this type should be skipped when
// the JobQueue receives a stop notification. If the job type isn't
// skipped, the Job will be called and the job must call Job::shouldCancel
// to determine if a long running or non-mandatory operation should be canceled.
bool skipOnStop (JobType type)
{
JobTypeInfo const& j (getJobTypes ().get (type));
assert (j.type () != jtINVALID);
return j.skip ();
}
// Returns the limit of running jobs for the given job type.
// For jobs with no limit, we return the largest int. Hopefully that
// will be enough.
//
int getJobLimit (JobType type)
{
JobTypeInfo const& j (getJobTypes ().get (type));
assert (j.type () != jtINVALID);
return j.limit ();
}
//--------------------------------------------------------------------------
void onStop ()
{
// VFALCO NOTE I wanted to remove all the jobs that are skippable
// but then the Workers count of tasks to process
// goes wrong.
/*
{
ScopedLock lock (m_mutex);
// Remove all jobs whose type is skipOnStop
typedef hash_map <JobType, std::size_t> JobDataMap;
JobDataMap counts;
bool const report (m_journal.debug.active());
for (JobSet::const_iterator iter (m_jobSet.begin());
iter != m_jobSet.end();)
{
if (skipOnStop (iter->getType()))
{
if (report)
{
std::pair <JobDataMap::iterator, bool> result (
counts.insert (std::make_pair (iter->getType(), 1)));
if (! result.second)
++(result.first->second);
}
iter = m_jobSet.erase (iter);
}
else
{
++iter;
}
}
if (report)
{
beast::Journal::ScopedStream s (m_journal.debug);
for (JobDataMap::const_iterator iter (counts.begin());
iter != counts.end(); ++iter)
{
s << std::endl <<
"Removed " << iter->second <<
" skiponStop jobs of type " << Job::toString (iter->first);
}
}
}
*/
}
void onChildrenStopped ()
{
ScopedLock lock (m_mutex);
checkStopped (lock);
}
};
//------------------------------------------------------------------------------
JobQueue::JobQueue (char const* name, Stoppable& parent)
: Stoppable (name, parent)
{
}
//------------------------------------------------------------------------------
std::unique_ptr <JobQueue> make_JobQueue (
beast::insight::Collector::ptr const& collector,
beast::Stoppable& parent, beast::Journal journal)
{
return std::make_unique <JobQueueImp> (collector, parent, journal);
}
}

View File

@@ -0,0 +1,97 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/core/LoadEvent.h>
#include <ripple/core/LoadMonitor.h>
namespace ripple {
LoadEvent::LoadEvent (LoadMonitor& monitor, std::string const& name, bool shouldStart)
: m_loadMonitor (monitor)
, m_isRunning (false)
, m_name (name)
, m_timeStopped (beast::RelativeTime::fromStartup())
, m_secondsWaiting (0)
, m_secondsRunning (0)
{
if (shouldStart)
start ();
}
LoadEvent::~LoadEvent ()
{
if (m_isRunning)
stop ();
}
std::string const& LoadEvent::name () const
{
return m_name;
}
double LoadEvent::getSecondsWaiting() const
{
return m_secondsWaiting;
}
double LoadEvent::getSecondsRunning() const
{
return m_secondsRunning;
}
double LoadEvent::getSecondsTotal() const
{
return m_secondsWaiting + m_secondsRunning;
}
void LoadEvent::reName (std::string const& name)
{
m_name = name;
}
void LoadEvent::start ()
{
beast::RelativeTime const currentTime (beast::RelativeTime::fromStartup());
// If we already called start, this call will replace the previous one.
if (m_isRunning)
{
m_secondsWaiting += (currentTime - m_timeStarted).inSeconds();
}
else
{
m_secondsWaiting += (currentTime - m_timeStopped).inSeconds();
m_isRunning = true;
}
m_timeStarted = currentTime;
}
void LoadEvent::stop ()
{
bassert (m_isRunning);
m_timeStopped = beast::RelativeTime::fromStartup();
m_secondsRunning += (m_timeStopped - m_timeStarted).inSeconds();
m_isRunning = false;
m_loadMonitor.addLoadSample (*this);
}
} // ripple

View File

@@ -0,0 +1,57 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/core/impl/LoadFeeTrackImp.h>
#include <ripple/core/Config.h>
#include <beast/unit_test/suite.h>
namespace ripple {
LoadFeeTrack* LoadFeeTrack::New (beast::Journal journal)
{
return new LoadFeeTrackImp (journal);
}
//------------------------------------------------------------------------------
class LoadFeeTrack_test : public beast::unit_test::suite
{
public:
void run ()
{
Config d; // get a default configuration object
LoadFeeTrackImp l;
expect (l.scaleFeeBase (10000, d.FEE_DEFAULT, d.TRANSACTION_FEE_BASE) == 10000);
expect (l.scaleFeeLoad (10000, d.FEE_DEFAULT, d.TRANSACTION_FEE_BASE, false) == 10000);
expect (l.scaleFeeBase (1, d.FEE_DEFAULT, d.TRANSACTION_FEE_BASE) == 1);
expect (l.scaleFeeLoad (1, d.FEE_DEFAULT, d.TRANSACTION_FEE_BASE, false) == 1);
// Check new default fee values give same fees as old defaults
expect (l.scaleFeeBase (d.FEE_DEFAULT, d.FEE_DEFAULT, d.TRANSACTION_FEE_BASE) == 10);
expect (l.scaleFeeBase (d.FEE_ACCOUNT_RESERVE, d.FEE_DEFAULT, d.TRANSACTION_FEE_BASE) == 200 * SYSTEM_CURRENCY_PARTS);
expect (l.scaleFeeBase (d.FEE_OWNER_RESERVE, d.FEE_DEFAULT, d.TRANSACTION_FEE_BASE) == 50 * SYSTEM_CURRENCY_PARTS);
expect (l.scaleFeeBase (d.FEE_OFFER, d.FEE_DEFAULT, d.TRANSACTION_FEE_BASE) == 10);
expect (l.scaleFeeBase (d.FEE_CONTRACT_OPERATION, d.FEE_DEFAULT, d.TRANSACTION_FEE_BASE) == 1);
}
};
BEAST_DEFINE_TESTSUITE(LoadFeeTrack,ripple_core,ripple);
} // ripple

View File

@@ -0,0 +1,238 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_LOADFEETRACKIMP_H_INCLUDED
#define RIPPLE_LOADFEETRACKIMP_H_INCLUDED
#include <ripple/common/jsonrpc_fields.h>
#include <ripple/core/LoadFeeTrack.h>
#include <mutex>
namespace ripple {
class LoadFeeTrackImp : public LoadFeeTrack
{
public:
explicit LoadFeeTrackImp (beast::Journal journal = beast::Journal())
: m_journal (journal)
, mLocalTxnLoadFee (lftNormalFee)
, mRemoteTxnLoadFee (lftNormalFee)
, mClusterTxnLoadFee (lftNormalFee)
, raiseCount (0)
{
}
// Scale using load as well as base rate
std::uint64_t scaleFeeLoad (std::uint64_t fee, std::uint64_t baseFee, std::uint32_t referenceFeeUnits, bool bAdmin)
{
static std::uint64_t midrange (0x00000000FFFFFFFF);
bool big = (fee > midrange);
if (big) // big fee, divide first to avoid overflow
fee /= baseFee;
else // normal fee, multiply first for accuracy
fee *= referenceFeeUnits;
std::uint32_t feeFactor = std::max (mLocalTxnLoadFee, mRemoteTxnLoadFee);
// Let admins pay the normal fee until the local load exceeds four times the remote
std::uint32_t uRemFee = std::max(mRemoteTxnLoadFee, mClusterTxnLoadFee);
if (bAdmin && (feeFactor > uRemFee) && (feeFactor < (4 * uRemFee)))
feeFactor = uRemFee;
{
ScopedLockType sl (mLock);
fee = mulDiv (fee, feeFactor, lftNormalFee);
}
if (big) // Fee was big to start, must now multiply
fee *= referenceFeeUnits;
else // Fee was small to start, mst now divide
fee /= baseFee;
return fee;
}
// Scale from fee units to millionths of a ripple
std::uint64_t scaleFeeBase (std::uint64_t fee, std::uint64_t baseFee, std::uint32_t referenceFeeUnits)
{
return mulDiv (fee, referenceFeeUnits, baseFee);
}
std::uint32_t getRemoteFee ()
{
ScopedLockType sl (mLock);
return mRemoteTxnLoadFee;
}
std::uint32_t getLocalFee ()
{
ScopedLockType sl (mLock);
return mLocalTxnLoadFee;
}
std::uint32_t getLoadBase ()
{
return lftNormalFee;
}
std::uint32_t getLoadFactor ()
{
ScopedLockType sl (mLock);
return std::max(mClusterTxnLoadFee, std::max (mLocalTxnLoadFee, mRemoteTxnLoadFee));
}
void setClusterFee (std::uint32_t fee)
{
ScopedLockType sl (mLock);
mClusterTxnLoadFee = fee;
}
std::uint32_t getClusterFee ()
{
ScopedLockType sl (mLock);
return mClusterTxnLoadFee;
}
bool isLoadedLocal ()
{
// VFALCO TODO This could be replaced with a SharedData and
// using a read/write lock instead of a critical section.
//
// NOTE This applies to all the locking in this class.
//
//
ScopedLockType sl (mLock);
return (raiseCount != 0) || (mLocalTxnLoadFee != lftNormalFee);
}
bool isLoadedCluster ()
{
// VFALCO TODO This could be replaced with a SharedData and
// using a read/write lock instead of a critical section.
//
// NOTE This applies to all the locking in this class.
//
//
ScopedLockType sl (mLock);
return (raiseCount != 0) || (mLocalTxnLoadFee != lftNormalFee) || (mClusterTxnLoadFee != lftNormalFee);
}
void setRemoteFee (std::uint32_t f)
{
ScopedLockType sl (mLock);
mRemoteTxnLoadFee = f;
}
bool raiseLocalFee ()
{
ScopedLockType sl (mLock);
if (++raiseCount < 2)
return false;
std::uint32_t origFee = mLocalTxnLoadFee;
if (mLocalTxnLoadFee < mRemoteTxnLoadFee) // make sure this fee takes effect
mLocalTxnLoadFee = mRemoteTxnLoadFee;
mLocalTxnLoadFee += (mLocalTxnLoadFee / lftFeeIncFraction); // increment by 1/16th
if (mLocalTxnLoadFee > lftFeeMax)
mLocalTxnLoadFee = lftFeeMax;
if (origFee == mLocalTxnLoadFee)
return false;
m_journal.debug << "Local load fee raised from " << origFee << " to " << mLocalTxnLoadFee;
return true;
}
bool lowerLocalFee ()
{
ScopedLockType sl (mLock);
std::uint32_t origFee = mLocalTxnLoadFee;
raiseCount = 0;
mLocalTxnLoadFee -= (mLocalTxnLoadFee / lftFeeDecFraction ); // reduce by 1/4
if (mLocalTxnLoadFee < lftNormalFee)
mLocalTxnLoadFee = lftNormalFee;
if (origFee == mLocalTxnLoadFee)
return false;
m_journal.debug << "Local load fee lowered from " << origFee << " to " << mLocalTxnLoadFee;
return true;
}
Json::Value getJson (std::uint64_t baseFee, std::uint32_t referenceFeeUnits)
{
Json::Value j (Json::objectValue);
{
ScopedLockType sl (mLock);
// base_fee = The cost to send a "reference" transaction under no load, in millionths of a Ripple
j[jss::base_fee] = Json::Value::UInt (baseFee);
// load_fee = The cost to send a "reference" transaction now, in millionths of a Ripple
j[jss::load_fee] = Json::Value::UInt (
mulDiv (baseFee, std::max (mLocalTxnLoadFee, mRemoteTxnLoadFee), lftNormalFee));
}
return j;
}
private:
// VFALCO TODO Move this function to some "math utilities" file
// compute (value)*(mul)/(div) - avoid overflow but keep precision
std::uint64_t mulDiv (std::uint64_t value, std::uint32_t mul, std::uint64_t div)
{
// VFALCO TODO replace with beast::literal64bitUnsigned ()
//
static std::uint64_t boundary = (0x00000000FFFFFFFF);
if (value > boundary) // Large value, avoid overflow
return (value / div) * mul;
else // Normal value, preserve accuracy
return (value * mul) / div;
}
private:
static const int lftNormalFee = 256; // 256 is the minimum/normal load factor
static const int lftFeeIncFraction = 4; // increase fee by 1/4
static const int lftFeeDecFraction = 4; // decrease fee by 1/4
static const int lftFeeMax = lftNormalFee * 1000000;
beast::Journal m_journal;
typedef std::mutex LockType;
typedef std::lock_guard <LockType> ScopedLockType;
LockType mLock;
std::uint32_t mLocalTxnLoadFee; // Scale factor, lftNormalFee = normal fee
std::uint32_t mRemoteTxnLoadFee; // Scale factor, lftNormalFee = normal fee
std::uint32_t mClusterTxnLoadFee; // Scale factor, lftNormalFee = normal fee
int raiseCount;
};
} // ripple
#endif

View File

@@ -0,0 +1,243 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/basics/Log.h>
#include <ripple/basics/UptimeTimer.h>
#include <ripple/core/LoadMonitor.h>
namespace ripple {
/*
TODO
----
- Use Journal for logging
*/
//------------------------------------------------------------------------------
LoadMonitor::Stats::Stats()
: count (0)
, latencyAvg (0)
, latencyPeak (0)
, isOverloaded (false)
{
}
//------------------------------------------------------------------------------
LoadMonitor::LoadMonitor ()
: mCounts (0)
, mLatencyEvents (0)
, mLatencyMSAvg (0)
, mLatencyMSPeak (0)
, mTargetLatencyAvg (0)
, mTargetLatencyPk (0)
, mLastUpdate (UptimeTimer::getInstance ().getElapsedSeconds ())
{
}
// VFALCO NOTE WHY do we need "the mutex?" This dependence on
// a hidden global, especially a synchronization primitive,
// is a flawed design.
// It's not clear exactly which data needs to be protected.
//
// call with the mutex
void LoadMonitor::update ()
{
int now = UptimeTimer::getInstance ().getElapsedSeconds ();
// VFALCO TODO stop returning from the middle of functions.
if (now == mLastUpdate) // current
return;
// VFALCO TODO Why 8?
if ((now < mLastUpdate) || (now > (mLastUpdate + 8)))
{
// way out of date
mCounts = 0;
mLatencyEvents = 0;
mLatencyMSAvg = 0;
mLatencyMSPeak = 0;
mLastUpdate = now;
// VFALCO TODO don't return from the middle...
return;
}
// do exponential decay
/*
David:
"Imagine if you add 10 to something every second. And you
also reduce it by 1/4 every second. It will "idle" at 40,
correponding to 10 counts per second."
*/
do
{
++mLastUpdate;
mCounts -= ((mCounts + 3) / 4);
mLatencyEvents -= ((mLatencyEvents + 3) / 4);
mLatencyMSAvg -= (mLatencyMSAvg / 4);
mLatencyMSPeak -= (mLatencyMSPeak / 4);
}
while (mLastUpdate < now);
}
void LoadMonitor::addCount ()
{
ScopedLockType sl (mLock);
update ();
++mCounts;
}
void LoadMonitor::addLatency (int latency)
{
// VFALCO NOTE Why does 1 become 0?
if (latency == 1)
latency = 0;
ScopedLockType sl (mLock);
update ();
++mLatencyEvents;
mLatencyMSAvg += latency;
mLatencyMSPeak += latency;
// Units are quarters of a millisecond
int const latencyPeak = mLatencyEvents * latency * 4;
if (mLatencyMSPeak < latencyPeak)
mLatencyMSPeak = latencyPeak;
}
std::string LoadMonitor::printElapsed (double seconds)
{
std::stringstream ss;
ss << (std::size_t (seconds * 1000 + 0.5)) << " ms";
return ss.str();
}
void LoadMonitor::addLoadSample (LoadEvent const& sample)
{
std::string const& name (sample.name());
beast::RelativeTime const latency (sample.getSecondsTotal());
if (latency.inSeconds() > 0.5)
{
WriteLog ((latency.inSeconds() > 1.0) ? lsWARNING : lsINFO, LoadMonitor)
<< "Job: " << name << " ExecutionTime: " << printElapsed (sample.getSecondsRunning()) <<
" WaitingTime: " << printElapsed (sample.getSecondsWaiting());
}
// VFALCO NOTE Why does 1 become 0?
std::size_t latencyMilliseconds (latency.inMilliseconds());
if (latencyMilliseconds == 1)
latencyMilliseconds = 0;
ScopedLockType sl (mLock);
update ();
++mCounts;
++mLatencyEvents;
mLatencyMSAvg += latencyMilliseconds;
mLatencyMSPeak += latencyMilliseconds;
// VFALCO NOTE Why are we multiplying by 4?
int const latencyPeak = mLatencyEvents * latencyMilliseconds * 4;
if (mLatencyMSPeak < latencyPeak)
mLatencyMSPeak = latencyPeak;
}
/* Add multiple samples
@param count The number of samples to add
@param latencyMS The total number of milliseconds
*/
void LoadMonitor::addSamples (int count, std::chrono::milliseconds latency)
{
ScopedLockType sl (mLock);
update ();
mCounts += count;
mLatencyEvents += count;
mLatencyMSAvg += latency.count();
mLatencyMSPeak += latency.count();
int const latencyPeak = mLatencyEvents * latency.count() * 4 / count;
if (mLatencyMSPeak < latencyPeak)
mLatencyMSPeak = latencyPeak;
}
void LoadMonitor::setTargetLatency (std::uint64_t avg, std::uint64_t pk)
{
mTargetLatencyAvg = avg;
mTargetLatencyPk = pk;
}
bool LoadMonitor::isOverTarget (std::uint64_t avg, std::uint64_t peak)
{
return (mTargetLatencyPk && (peak > mTargetLatencyPk)) ||
(mTargetLatencyAvg && (avg > mTargetLatencyAvg));
}
bool LoadMonitor::isOver ()
{
ScopedLockType sl (mLock);
update ();
if (mLatencyEvents == 0)
return 0;
return isOverTarget (mLatencyMSAvg / (mLatencyEvents * 4), mLatencyMSPeak / (mLatencyEvents * 4));
}
LoadMonitor::Stats LoadMonitor::getStats ()
{
Stats stats;
ScopedLockType sl (mLock);
update ();
stats.count = mCounts / 4;
if (mLatencyEvents == 0)
{
stats.latencyAvg = 0;
stats.latencyPeak = 0;
}
else
{
stats.latencyAvg = mLatencyMSAvg / (mLatencyEvents * 4);
stats.latencyPeak = mLatencyMSPeak / (mLatencyEvents * 4);
}
stats.isOverloaded = isOverTarget (stats.latencyAvg, stats.latencyPeak);
return stats;
}
} // ripple