Compare commits

...

2 Commits

Author SHA1 Message Date
Denis Angell
80b4c941f0 update headers 2024-07-04 10:29:43 +02:00
Denis Angell
7b5ed1c7c4 rough draft 2024-06-06 12:38:20 +02:00
12 changed files with 185 additions and 46 deletions

View File

@@ -588,6 +588,7 @@ target_sources (rippled PRIVATE
src/ripple/resource/impl/Consumer.cpp src/ripple/resource/impl/Consumer.cpp
src/ripple/resource/impl/Fees.cpp src/ripple/resource/impl/Fees.cpp
src/ripple/resource/impl/ResourceManager.cpp src/ripple/resource/impl/ResourceManager.cpp
src/ripple/resource/impl/Tuning.cpp
#[===============================[ #[===============================[
main sources: main sources:
subdir: rpc subdir: rpc

View File

@@ -23,7 +23,9 @@
# #
# 9. Misc Settings # 9. Misc Settings
# #
# 10. Example Settings # 10. Resource Settings
#
# 11. Example Settings
# #
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
# #
@@ -1565,7 +1567,41 @@
# #
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
# #
# 10. Example Settings # 10. Resource Settings
#
#--------------------
# [resource]
#
# A set of key/value pair parameters to tune the performance of the
# transaction queue.
#
# warning_threshold = <number>
#
# Lorem Epsium....
#
# drop_threshold = <number>
#
# Lorem Epsium....
#
# decay_window_seconds = <number>
#
# Lorem Epsium....
#
# minimum_gossip_balance = <number>
#
# Lorem Epsium....
#
# seconds_until_expiration = <number>
#
# Lorem Epsium....
#
# gossip_expiration_seconds = <number>
#
# Lorem Epsium....
#
#
#
# 11. Example Settings
# #
#-------------------- #--------------------
# #

View File

@@ -352,6 +352,7 @@ public:
, validatorKeys_(*config_, m_journal) , validatorKeys_(*config_, m_journal)
, m_resourceManager(Resource::make_Manager( , m_resourceManager(Resource::make_Manager(
config_->section("resource"),
m_collectorManager->collector(), m_collectorManager->collector(),
logs_->journal("Resource"))) logs_->journal("Resource")))

View File

@@ -101,6 +101,7 @@ struct ConfigSection
#define SECTION_SWEEP_INTERVAL "sweep_interval" #define SECTION_SWEEP_INTERVAL "sweep_interval"
#define SECTION_NETWORK_ID "network_id" #define SECTION_NETWORK_ID "network_id"
#define SECTION_IMPORT_VL_KEYS "import_vl_keys" #define SECTION_IMPORT_VL_KEYS "import_vl_keys"
#define SECTION_RESOURCE "resource"
} // namespace ripple } // namespace ripple

View File

@@ -80,6 +80,7 @@ public:
std::unique_ptr<Manager> std::unique_ptr<Manager>
make_Manager( make_Manager(
Section const& section,
beast::insight::Collector::ptr const& collector, beast::insight::Collector::ptr const& collector,
beast::Journal journal); beast::Journal journal);

View File

@@ -89,7 +89,7 @@ struct Entry : public beast::List<Entry>::Node
int refcount; int refcount;
// Exponentially decaying balance of resource consumption // Exponentially decaying balance of resource consumption
DecayingSample<decayWindowSeconds, clock_type> local_balance; DecayingSample<Tuning::getDecayWindowSeconds(), clock_type> local_balance;
// Normalized balance contribution from imports // Normalized balance contribution from imports
int remote_balance; int remote_balance;

View File

@@ -31,6 +31,7 @@
#include <ripple/resource/Fees.h> #include <ripple/resource/Fees.h>
#include <ripple/resource/Gossip.h> #include <ripple/resource/Gossip.h>
#include <ripple/resource/impl/Import.h> #include <ripple/resource/impl/Import.h>
#include <ripple/resource/impl/Tuning.h>
#include <cassert> #include <cassert>
#include <mutex> #include <mutex>
@@ -88,11 +89,27 @@ private:
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
public: public:
Logic( Logic(
Section const& section,
beast::insight::Collector::ptr const& collector, beast::insight::Collector::ptr const& collector,
clock_type& clock, clock_type& clock,
beast::Journal journal) beast::Journal journal)
: m_stats(collector), m_clock(clock), m_journal(journal) : m_stats(collector), m_clock(clock), m_journal(journal)
{ {
std::uint32_t warningThreshold;
if (get_if_exists(section, "warning_threshold", warningThreshold))
Tuning::warningThreshold = warningThreshold;
std::uint32_t dropThreshold;
if (get_if_exists(section, "drop_threshold", dropThreshold))
Tuning::dropThreshold = dropThreshold;
// std::uint32_t decayWindowSeconds;
// if (get_if_exists(section, "decay_window_seconds", decayWindowSeconds))
// Tuning::decayWindowSeconds = decayWindowSeconds;
std::uint32_t minimumGossipBalance;
if (get_if_exists(section, "minimum_gossip_balance", minimumGossipBalance))
Tuning::minimumGossipBalance = minimumGossipBalance;
} }
~Logic() ~Logic()
@@ -200,7 +217,7 @@ public:
Json::Value Json::Value
getJson() getJson()
{ {
return getJson(warningThreshold); return getJson(Tuning::warningThreshold);
} }
/** Returns a Json::objectValue. */ /** Returns a Json::objectValue. */
@@ -266,7 +283,7 @@ public:
{ {
Gossip::Item item; Gossip::Item item;
item.balance = inboundEntry.local_balance.value(now); item.balance = inboundEntry.local_balance.value(now);
if (item.balance >= minimumGossipBalance) if (item.balance >= Tuning::minimumGossipBalance)
{ {
item.address = inboundEntry.key->address; item.address = inboundEntry.key->address;
gossip.items.push_back(item); gossip.items.push_back(item);
@@ -294,7 +311,7 @@ public:
{ {
// This is a new import // This is a new import
Import& next(resultIt->second); Import& next(resultIt->second);
next.whenExpires = elapsed + gossipExpirationSeconds; next.whenExpires = elapsed + Tuning::gossipExpirationSeconds;
next.items.reserve(gossip.items.size()); next.items.reserve(gossip.items.size());
for (auto const& gossipItem : gossip.items) for (auto const& gossipItem : gossip.items)
@@ -312,7 +329,7 @@ public:
// balances and then deduct the old remote balances. // balances and then deduct the old remote balances.
Import next; Import next;
next.whenExpires = elapsed + gossipExpirationSeconds; next.whenExpires = elapsed + Tuning::gossipExpirationSeconds;
next.items.reserve(gossip.items.size()); next.items.reserve(gossip.items.size());
for (auto const& gossipItem : gossip.items) for (auto const& gossipItem : gossip.items)
{ {
@@ -387,10 +404,10 @@ public:
static Disposition static Disposition
disposition(int balance) disposition(int balance)
{ {
if (balance >= dropThreshold) if (balance >= Tuning::dropThreshold)
return Disposition::drop; return Disposition::drop;
if (balance >= warningThreshold) if (balance >= Tuning::warningThreshold)
return Disposition::warn; return Disposition::warn;
return Disposition::ok; return Disposition::ok;
@@ -437,7 +454,7 @@ public:
break; break;
} }
inactive_.push_back(entry); inactive_.push_back(entry);
entry.whenExpires = m_clock.now() + secondsUntilExpiration; entry.whenExpires = m_clock.now() + Tuning::secondsUntilExpiration;
} }
} }
@@ -460,7 +477,7 @@ public:
std::lock_guard _(lock_); std::lock_guard _(lock_);
bool notify(false); bool notify(false);
auto const elapsed = m_clock.now(); auto const elapsed = m_clock.now();
if (entry.balance(m_clock.now()) >= warningThreshold && if (entry.balance(m_clock.now()) >= Tuning::warningThreshold &&
elapsed != entry.lastWarningTime) elapsed != entry.lastWarningTime)
{ {
charge(entry, feeWarning); charge(entry, feeWarning);
@@ -485,11 +502,11 @@ public:
bool drop(false); bool drop(false);
clock_type::time_point const now(m_clock.now()); clock_type::time_point const now(m_clock.now());
int const balance(entry.balance(now)); int const balance(entry.balance(now));
if (balance >= dropThreshold) if (balance >= Tuning::dropThreshold)
{ {
JLOG(m_journal.warn()) JLOG(m_journal.warn())
<< "Consumer entry " << entry << " dropped with balance " << "Consumer entry " << entry << " dropped with balance "
<< balance << " at or above drop threshold " << dropThreshold; << balance << " at or above drop threshold " << Tuning::dropThreshold;
// Adding feeDrop at this point keeps the dropped connection // Adding feeDrop at this point keeps the dropped connection
// from re-connecting for at least a little while after it is // from re-connecting for at least a little while after it is

View File

@@ -45,9 +45,10 @@ private:
public: public:
ManagerImp( ManagerImp(
Section const& section,
beast::insight::Collector::ptr const& collector, beast::insight::Collector::ptr const& collector,
beast::Journal journal) beast::Journal journal)
: journal_(journal), logic_(collector, stopwatch(), journal) : journal_(journal), logic_(section, collector, stopwatch(), journal)
{ {
thread_ = std::thread{&ManagerImp::run, this}; thread_ = std::thread{&ManagerImp::run, this};
} }
@@ -173,10 +174,11 @@ Manager::~Manager() = default;
std::unique_ptr<Manager> std::unique_ptr<Manager>
make_Manager( make_Manager(
Section const& section,
beast::insight::Collector::ptr const& collector, beast::insight::Collector::ptr const& collector,
beast::Journal journal) beast::Journal journal)
{ {
return std::make_unique<ManagerImp>(collector, journal); return std::make_unique<ManagerImp>(section, collector, journal);
} }
} // namespace Resource } // namespace Resource

View File

@@ -0,0 +1,34 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "Tuning.h"
namespace ripple {
namespace Resource {
uint32_t Tuning::warningThreshold = 5000;
uint32_t Tuning::dropThreshold = 15000;
// uint32_t Tuning::decayWindowSeconds = 32;
uint32_t Tuning::minimumGossipBalance = 1000;
std::chrono::seconds constexpr Tuning::secondsUntilExpiration{300};
std::chrono::seconds constexpr Tuning::gossipExpirationSeconds{30};
} // namespace Resource
} // namespace ripple

View File

@@ -17,6 +17,7 @@
*/ */
//============================================================================== //==============================================================================
// Tuning.h
#ifndef RIPPLE_RESOURCE_TUNING_H_INCLUDED #ifndef RIPPLE_RESOURCE_TUNING_H_INCLUDED
#define RIPPLE_RESOURCE_TUNING_H_INCLUDED #define RIPPLE_RESOURCE_TUNING_H_INCLUDED
@@ -25,31 +26,23 @@
namespace ripple { namespace ripple {
namespace Resource { namespace Resource {
/** Tunable constants. */ class Tuning
enum { {
// Balance at which a warning is issued public:
warningThreshold = 5000 static std::uint32_t warningThreshold;
static std::uint32_t dropThreshold;
// static std::uint32_t decayWindowSeconds;
static std::uint32_t minimumGossipBalance;
// Balance at which the consumer is disconnected static std::chrono::seconds const secondsUntilExpiration;
, static std::chrono::seconds const gossipExpirationSeconds;
dropThreshold = 15000
// The number of seconds in the exponential decay window static constexpr std::uint32_t getDecayWindowSeconds()
// (This should be a power of two) {
, return 32;
decayWindowSeconds = 32 }
// The minimum balance required in order to include a load source in gossip
,
minimumGossipBalance = 1000
}; };
// The number of seconds until an inactive table item is removed
std::chrono::seconds constexpr secondsUntilExpiration{300};
// Number of seconds until imported gossip expires
std::chrono::seconds constexpr gossipExpirationSeconds{30};
} // namespace Resource } // namespace Resource
} // namespace ripple } // namespace ripple

View File

@@ -23,6 +23,7 @@
#include <ripple/resource/Consumer.h> #include <ripple/resource/Consumer.h>
#include <ripple/resource/impl/Entry.h> #include <ripple/resource/impl/Entry.h>
#include <ripple/resource/impl/Logic.h> #include <ripple/resource/impl/Logic.h>
#include <ripple/resource/impl/Tuning.h>
#include <test/unit_test/SuiteJournal.h> #include <test/unit_test/SuiteJournal.h>
#include <boost/utility/base_from_member.hpp> #include <boost/utility/base_from_member.hpp>
@@ -42,8 +43,8 @@ public:
using clock_type = boost::base_from_member<TestStopwatch>; using clock_type = boost::base_from_member<TestStopwatch>;
public: public:
explicit TestLogic(beast::Journal journal) explicit TestLogic(Section const& section, beast::Journal journal)
: Logic(beast::insight::NullCollector::New(), member, journal) : Logic(section, beast::insight::NullCollector::New(), member, journal)
{ {
} }
@@ -89,9 +90,13 @@ public:
else else
testcase("Unlimited warn/drop"); testcase("Unlimited warn/drop");
TestLogic logic(j); auto const config = test::jtx::envconfig([](std::unique_ptr<Config> cfg) {
return cfg;
});
Charge const fee(dropThreshold + 1); TestLogic logic(config->section("resource"), j);
Charge const fee(Tuning::dropThreshold + 1);
beast::IP::Endpoint const addr( beast::IP::Endpoint const addr(
beast::IP::Endpoint::from_string("192.0.2.2")); beast::IP::Endpoint::from_string("192.0.2.2"));
@@ -173,7 +178,7 @@ public:
using namespace std::chrono_literals; using namespace std::chrono_literals;
// Give Consumer time to become readmitted. Should never // Give Consumer time to become readmitted. Should never
// exceed expiration time. // exceed expiration time.
auto n = secondsUntilExpiration + 1s; auto n = Tuning::secondsUntilExpiration + 1s;
while (--n > 0s) while (--n > 0s)
{ {
++logic.clock(); ++logic.clock();
@@ -199,7 +204,11 @@ public:
{ {
testcase("Imports"); testcase("Imports");
TestLogic logic(j); auto const config = test::jtx::envconfig([](std::unique_ptr<Config> cfg) {
return cfg;
});
TestLogic logic(config->section("resource"), j);
Gossip g[5]; Gossip g[5];
@@ -217,7 +226,11 @@ public:
{ {
testcase("Import"); testcase("Import");
TestLogic logic(j); auto const config = test::jtx::envconfig([](std::unique_ptr<Config> cfg) {
return cfg;
});
TestLogic logic(config->section("resource"), j);
Gossip g; Gossip g;
Gossip::Item item; Gossip::Item item;
@@ -236,7 +249,11 @@ public:
{ {
testcase("Charge"); testcase("Charge");
TestLogic logic(j); auto const config = test::jtx::envconfig([](std::unique_ptr<Config> cfg) {
return cfg;
});
TestLogic logic(config->section("resource"), j);
{ {
beast::IP::Endpoint address( beast::IP::Endpoint address(
@@ -275,6 +292,41 @@ public:
pass(); pass();
} }
void
testConfig(beast::Journal j)
{
std::cout << "warningThreshold: " << Tuning::warningThreshold << "\n";
std::cout << "dropThreshold: " << Tuning::dropThreshold << "\n";
std::cout << "getDecayWindowSeconds: " << Tuning::getDecayWindowSeconds() << "\n";
std::cout << "minimumGossipBalance: " << Tuning::minimumGossipBalance << "\n";
BEAST_EXPECT(Tuning::warningThreshold == 5000);
BEAST_EXPECT(Tuning::dropThreshold == 15000);
BEAST_EXPECT(Tuning::getDecayWindowSeconds() == 32);
BEAST_EXPECT(Tuning::minimumGossipBalance == 1000);
BEAST_EXPECT(Tuning::secondsUntilExpiration == std::chrono::seconds{300});
BEAST_EXPECT(Tuning::gossipExpirationSeconds == std::chrono::seconds{30});
auto const config = test::jtx::envconfig([](std::unique_ptr<Config> cfg) {
cfg->section("resource").set("warning_threshold", "15000");
cfg->section("resource").set("drop_threshold", "25000");
cfg->section("resource").set("minimum_gossip_balance", "2000");
cfg->section("resource").set("seconds_until_expiration", "600");
cfg->section("resource").set("gossip_expiration_seconds", "60");
return cfg;
});
TestLogic logic(config->section("resource"), j);
BEAST_EXPECT(Tuning::warningThreshold == 15000);
BEAST_EXPECT(Tuning::dropThreshold == 25000);
BEAST_EXPECT(Tuning::getDecayWindowSeconds() == 32);
BEAST_EXPECT(Tuning::minimumGossipBalance == 2000);
// BEAST_EXPECT(Tuning::secondsUntilExpiration == 600);
// BEAST_EXPECT(Tuning::gossipExpirationSeconds == 60);
}
void void
run() override run() override
{ {
@@ -286,6 +338,7 @@ public:
testCharges(journal); testCharges(journal);
testImports(journal); testImports(journal);
testImport(journal); testImport(journal);
testConfig(journal);
} }
}; };

View File

@@ -276,11 +276,11 @@ class NoRippleCheckLimits_test : public beast::unit_test::suite
Endpoint::from_string(test::getEnvLocalhostAddr())); Endpoint::from_string(test::getEnvLocalhostAddr()));
// if we go above the warning threshold, reset // if we go above the warning threshold, reset
if (c.balance() > warningThreshold) if (c.balance() > Tuning::warningThreshold)
{ {
using ct = beast::abstract_clock<steady_clock>; using ct = beast::abstract_clock<steady_clock>;
c.entry().local_balance = c.entry().local_balance =
DecayingSample<decayWindowSeconds, ct>{steady_clock::now()}; DecayingSample<Tuning::getDecayWindowSeconds(), ct>{steady_clock::now()};
} }
}; };