Adjust default number of threads for JobQueue:

The existing calculation would limit the maximum number of threads
that would be created by default to at most 6; this may have been
reasonable a few years ago, but given both the load on the network
as of today and the increase in the number of CPU cores, the value
should be revisited.

This commit, if merged, changes the default calculation for nodes
that are configured as `large` or `huge` to allow for up to twelve
threads.
This commit is contained in:
Nik Bougalis
2021-11-09 02:04:34 -08:00
parent eaff0d30fb
commit eb6b79bed7
8 changed files with 90 additions and 57 deletions

View File

@@ -87,7 +87,8 @@ RCLValidatedLedger::operator[](Seq const& s) const -> ID
JLOG(j_.warn()) << "Unable to determine hash of ancestor seq=" << s
<< " from ledger hash=" << ledgerID_
<< " seq=" << ledgerSeq_;
<< " seq=" << ledgerSeq_ << " (available: " << minSeq()
<< "-" << seq() << ")";
// Default ID that is less than all others
return ID{0};
}

View File

@@ -283,6 +283,29 @@ public:
logs_->journal("Collector")))
, m_jobQueue(std::make_unique<JobQueue>(
[](std::unique_ptr<Config> const& config) {
if (config->standalone() && !config->reporting() &&
!config->FORCE_MULTI_THREAD)
return 1;
if (config->WORKERS)
return config->WORKERS;
auto count =
static_cast<int>(std::thread::hardware_concurrency());
// Be more aggressive about the number of threads to use
// for the job queue if the server is configured as "large"
// or "huge" if there are enough cores.
if (config->NODE_SIZE >= 4 && count >= 16)
count = 6 + std::min(count, 8);
else if (config->NODE_SIZE >= 3 && count >= 8)
count = 4 + std::min(count, 6);
else
count = 2 + std::min(count, 4);
return count;
}(config_),
m_collectorManager->group("jobq"),
logs_->journal("JobQueue"),
*logs_,
@@ -305,6 +328,7 @@ public:
logs_->journal("TaggedCache"))
, cachedSLEs_(std::chrono::minutes(1), stopwatch())
, validatorKeys_(*config_, m_journal)
, m_resourceManager(Resource::make_Manager(
@@ -1222,9 +1246,6 @@ ApplicationImp::setup()
// Optionally turn off logging to console.
logs_->silent(config_->silent());
m_jobQueue->setThreadCount(
config_->WORKERS, config_->standalone() && !config_->reporting());
if (!config_->standalone())
timeKeeper_->run(config_->SNTP_SERVERS);

View File

@@ -202,7 +202,9 @@ public:
std::chrono::seconds AMENDMENT_MAJORITY_TIME = defaultAmendmentMajorityTime;
// Thread pool configuration
std::size_t WORKERS = 0;
int WORKERS = 0;
// Can only be set in code, specifically unit tests
bool FORCE_MULTI_THREAD = false;
// Normally the sweep timer is automatically deduced based on the node
// size, but we allow admins to explicitly set it in the config.

View File

@@ -141,6 +141,7 @@ public:
using JobFunction = std::function<void(Job&)>;
JobQueue(
int threadCount,
beast::insight::Collector::ptr const& collector,
beast::Journal journal,
Logs& logs,
@@ -200,11 +201,6 @@ public:
int
getJobCountGE(JobType t) const;
/** Set the number of thread serving the job queue to precisely this number.
*/
void
setThreadCount(int c, bool const standaloneMode);
/** Return a scoped LoadEvent.
*/
std::unique_ptr<LoadEvent>

View File

@@ -637,7 +637,14 @@ Config::loadFromString(std::string const& fileContents)
}
if (getSingleSection(secConfig, SECTION_WORKERS, strTemp, j_))
WORKERS = beast::lexicalCastThrow<std::size_t>(strTemp);
{
WORKERS = beast::lexicalCastThrow<int>(strTemp);
if (WORKERS < 1 || WORKERS > 1024)
Throw<std::runtime_error>(
"Invalid " SECTION_WORKERS
": must be between 1 and 1024 inclusive.");
}
if (getSingleSection(secConfig, SECTION_COMPRESSION, strTemp, j_))
COMPRESSION = beast::lexicalCastThrow<bool>(strTemp);

View File

@@ -25,6 +25,7 @@
namespace ripple {
JobQueue::JobQueue(
int threadCount,
beast::insight::Collector::ptr const& collector,
beast::Journal journal,
Logs& logs,
@@ -33,11 +34,13 @@ JobQueue::JobQueue(
, m_lastJob(0)
, m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
, m_processCount(0)
, m_workers(*this, &perfLog, "JobQueue", 0)
, m_workers(*this, &perfLog, "JobQueue", threadCount)
, m_cancelCallback(std::bind(&JobQueue::isStopping, this))
, perfLog_(perfLog)
, m_collector(collector)
{
JLOG(m_journal.info()) << "Using " << threadCount << " threads";
hook = m_collector->make_hook(std::bind(&JobQueue::collect, this));
job_count = m_collector->make_gauge("job_count");
@@ -139,29 +142,6 @@ JobQueue::getJobCountGE(JobType t) const
return ret;
}
void
JobQueue::setThreadCount(int c, bool const standaloneMode)
{
if (standaloneMode)
{
c = 1;
}
else if (c == 0)
{
c = static_cast<int>(std::thread::hardware_concurrency());
c = 2 + std::min(c, 4); // I/O will bottleneck
JLOG(m_journal.info()) << "Auto-tuning to " << c
<< " validation/transaction/proposal threads.";
}
else
{
JLOG(m_journal.info()) << "Configured " << c
<< " validation/transaction/proposal threads.";
}
m_workers.setNumberOfThreads(c);
}
std::unique_ptr<LoadEvent>
JobQueue::makeLoadEvent(JobType t, std::string const& name)
{

View File

@@ -28,6 +28,7 @@ struct Transaction_ordering_test : public beast::unit_test::suite
testCorrectOrder()
{
using namespace jtx;
testcase("Correct Order");
Env env(*this);
auto const alice = Account("alice");
@@ -69,8 +70,13 @@ struct Transaction_ordering_test : public beast::unit_test::suite
{
using namespace jtx;
Env env(*this);
env.app().getJobQueue().setThreadCount(0, false);
testcase("Incorrect order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = false;
return cfg;
}));
auto const alice = Account("alice");
env.fund(XRP(1000), noripple(alice));
@@ -109,8 +115,13 @@ struct Transaction_ordering_test : public beast::unit_test::suite
{
using namespace jtx;
Env env(*this);
env.app().getJobQueue().setThreadCount(0, false);
testcase("Incorrect order multiple intermediaries");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
auto const alice = Account("alice");
env.fund(XRP(1000), noripple(alice));

View File

@@ -63,17 +63,23 @@ public:
{
using namespace std::chrono_literals;
using namespace jtx;
Env env(*this);
auto& jq = env.app().getJobQueue();
jq.setThreadCount(0, false);
testcase("correct order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g1, g2;
std::shared_ptr<JobQueue::Coro> c;
jq.postCoro(jtCLIENT, "Coroutine-Test", [&](auto const& cr) {
c = cr;
g1.signal();
c->yield();
g2.signal();
});
env.app().getJobQueue().postCoro(
jtCLIENT, "Coroutine-Test", [&](auto const& cr) {
c = cr;
g1.signal();
c->yield();
g2.signal();
});
BEAST_EXPECT(g1.wait_for(5s));
c->join();
c->post();
@@ -85,15 +91,21 @@ public:
{
using namespace std::chrono_literals;
using namespace jtx;
Env env(*this);
auto& jq = env.app().getJobQueue();
jq.setThreadCount(0, false);
testcase("incorrect order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
jq.postCoro(jtCLIENT, "Coroutine-Test", [&](auto const& c) {
c->post();
c->yield();
g.signal();
});
env.app().getJobQueue().postCoro(
jtCLIENT, "Coroutine-Test", [&](auto const& c) {
c->post();
c->yield();
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
}
@@ -102,9 +114,12 @@ public:
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("thread specific storage");
Env env(*this);
auto& jq = env.app().getJobQueue();
jq.setThreadCount(0, true);
static int const N = 4;
std::array<std::shared_ptr<JobQueue::Coro>, N> a;