From eb6b79bed776ed957ea91dbb88cf5c96da54c7de Mon Sep 17 00:00:00 2001 From: Nik Bougalis Date: Tue, 9 Nov 2021 02:04:34 -0800 Subject: [PATCH] Adjust default number of threads for JobQueue: The existing calculation would limit the maximum number of threads that would be created by default to at most 6; this may have been reasonable a few years ago, but given both the load on the network as of today and the increase in the number of CPU cores, the value should be revisited. This commit, if merged, changes the default calculation for nodes that are configured as `large` or `huge` to allow for up to twelve threads. --- src/ripple/app/consensus/RCLValidations.cpp | 3 +- src/ripple/app/main/Application.cpp | 27 +++++++++-- src/ripple/core/Config.h | 4 +- src/ripple/core/JobQueue.h | 6 +-- src/ripple/core/impl/Config.cpp | 9 +++- src/ripple/core/impl/JobQueue.cpp | 28 ++--------- src/test/app/Transaction_ordering_test.cpp | 19 ++++++-- src/test/core/Coroutine_test.cpp | 51 +++++++++++++-------- 8 files changed, 90 insertions(+), 57 deletions(-) diff --git a/src/ripple/app/consensus/RCLValidations.cpp b/src/ripple/app/consensus/RCLValidations.cpp index b22b73c4e..415f754f5 100644 --- a/src/ripple/app/consensus/RCLValidations.cpp +++ b/src/ripple/app/consensus/RCLValidations.cpp @@ -87,7 +87,8 @@ RCLValidatedLedger::operator[](Seq const& s) const -> ID JLOG(j_.warn()) << "Unable to determine hash of ancestor seq=" << s << " from ledger hash=" << ledgerID_ - << " seq=" << ledgerSeq_; + << " seq=" << ledgerSeq_ << " (available: " << minSeq() + << "-" << seq() << ")"; // Default ID that is less than all others return ID{0}; } diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index f8f1690f2..6dad8f240 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -283,6 +283,29 @@ public: logs_->journal("Collector"))) , m_jobQueue(std::make_unique( + [](std::unique_ptr const& config) { + if (config->standalone() && !config->reporting() && + !config->FORCE_MULTI_THREAD) + return 1; + + if (config->WORKERS) + return config->WORKERS; + + auto count = + static_cast(std::thread::hardware_concurrency()); + + // Be more aggressive about the number of threads to use + // for the job queue if the server is configured as "large" + // or "huge" if there are enough cores. + if (config->NODE_SIZE >= 4 && count >= 16) + count = 6 + std::min(count, 8); + else if (config->NODE_SIZE >= 3 && count >= 8) + count = 4 + std::min(count, 6); + else + count = 2 + std::min(count, 4); + + return count; + }(config_), m_collectorManager->group("jobq"), logs_->journal("JobQueue"), *logs_, @@ -305,6 +328,7 @@ public: logs_->journal("TaggedCache")) , cachedSLEs_(std::chrono::minutes(1), stopwatch()) + , validatorKeys_(*config_, m_journal) , m_resourceManager(Resource::make_Manager( @@ -1222,9 +1246,6 @@ ApplicationImp::setup() // Optionally turn off logging to console. logs_->silent(config_->silent()); - m_jobQueue->setThreadCount( - config_->WORKERS, config_->standalone() && !config_->reporting()); - if (!config_->standalone()) timeKeeper_->run(config_->SNTP_SERVERS); diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h index ea5044c2d..bd4957584 100644 --- a/src/ripple/core/Config.h +++ b/src/ripple/core/Config.h @@ -202,7 +202,9 @@ public: std::chrono::seconds AMENDMENT_MAJORITY_TIME = defaultAmendmentMajorityTime; // Thread pool configuration - std::size_t WORKERS = 0; + int WORKERS = 0; + // Can only be set in code, specifically unit tests + bool FORCE_MULTI_THREAD = false; // Normally the sweep timer is automatically deduced based on the node // size, but we allow admins to explicitly set it in the config. diff --git a/src/ripple/core/JobQueue.h b/src/ripple/core/JobQueue.h index 1d3fd5498..a93d68b85 100644 --- a/src/ripple/core/JobQueue.h +++ b/src/ripple/core/JobQueue.h @@ -141,6 +141,7 @@ public: using JobFunction = std::function; JobQueue( + int threadCount, beast::insight::Collector::ptr const& collector, beast::Journal journal, Logs& logs, @@ -200,11 +201,6 @@ public: int getJobCountGE(JobType t) const; - /** Set the number of thread serving the job queue to precisely this number. - */ - void - setThreadCount(int c, bool const standaloneMode); - /** Return a scoped LoadEvent. */ std::unique_ptr diff --git a/src/ripple/core/impl/Config.cpp b/src/ripple/core/impl/Config.cpp index b5a9677e2..e588553dd 100644 --- a/src/ripple/core/impl/Config.cpp +++ b/src/ripple/core/impl/Config.cpp @@ -637,7 +637,14 @@ Config::loadFromString(std::string const& fileContents) } if (getSingleSection(secConfig, SECTION_WORKERS, strTemp, j_)) - WORKERS = beast::lexicalCastThrow(strTemp); + { + WORKERS = beast::lexicalCastThrow(strTemp); + + if (WORKERS < 1 || WORKERS > 1024) + Throw( + "Invalid " SECTION_WORKERS + ": must be between 1 and 1024 inclusive."); + } if (getSingleSection(secConfig, SECTION_COMPRESSION, strTemp, j_)) COMPRESSION = beast::lexicalCastThrow(strTemp); diff --git a/src/ripple/core/impl/JobQueue.cpp b/src/ripple/core/impl/JobQueue.cpp index 4fd394994..d87664416 100644 --- a/src/ripple/core/impl/JobQueue.cpp +++ b/src/ripple/core/impl/JobQueue.cpp @@ -25,6 +25,7 @@ namespace ripple { JobQueue::JobQueue( + int threadCount, beast::insight::Collector::ptr const& collector, beast::Journal journal, Logs& logs, @@ -33,11 +34,13 @@ JobQueue::JobQueue( , m_lastJob(0) , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs) , m_processCount(0) - , m_workers(*this, &perfLog, "JobQueue", 0) + , m_workers(*this, &perfLog, "JobQueue", threadCount) , m_cancelCallback(std::bind(&JobQueue::isStopping, this)) , perfLog_(perfLog) , m_collector(collector) { + JLOG(m_journal.info()) << "Using " << threadCount << " threads"; + hook = m_collector->make_hook(std::bind(&JobQueue::collect, this)); job_count = m_collector->make_gauge("job_count"); @@ -139,29 +142,6 @@ JobQueue::getJobCountGE(JobType t) const return ret; } -void -JobQueue::setThreadCount(int c, bool const standaloneMode) -{ - if (standaloneMode) - { - c = 1; - } - else if (c == 0) - { - c = static_cast(std::thread::hardware_concurrency()); - c = 2 + std::min(c, 4); // I/O will bottleneck - JLOG(m_journal.info()) << "Auto-tuning to " << c - << " validation/transaction/proposal threads."; - } - else - { - JLOG(m_journal.info()) << "Configured " << c - << " validation/transaction/proposal threads."; - } - - m_workers.setNumberOfThreads(c); -} - std::unique_ptr JobQueue::makeLoadEvent(JobType t, std::string const& name) { diff --git a/src/test/app/Transaction_ordering_test.cpp b/src/test/app/Transaction_ordering_test.cpp index d292c8906..0353df906 100644 --- a/src/test/app/Transaction_ordering_test.cpp +++ b/src/test/app/Transaction_ordering_test.cpp @@ -28,6 +28,7 @@ struct Transaction_ordering_test : public beast::unit_test::suite testCorrectOrder() { using namespace jtx; + testcase("Correct Order"); Env env(*this); auto const alice = Account("alice"); @@ -69,8 +70,13 @@ struct Transaction_ordering_test : public beast::unit_test::suite { using namespace jtx; - Env env(*this); - env.app().getJobQueue().setThreadCount(0, false); + testcase("Incorrect order"); + + Env env(*this, envconfig([](std::unique_ptr cfg) { + cfg->FORCE_MULTI_THREAD = false; + return cfg; + })); + auto const alice = Account("alice"); env.fund(XRP(1000), noripple(alice)); @@ -109,8 +115,13 @@ struct Transaction_ordering_test : public beast::unit_test::suite { using namespace jtx; - Env env(*this); - env.app().getJobQueue().setThreadCount(0, false); + testcase("Incorrect order multiple intermediaries"); + + Env env(*this, envconfig([](std::unique_ptr cfg) { + cfg->FORCE_MULTI_THREAD = true; + return cfg; + })); + auto const alice = Account("alice"); env.fund(XRP(1000), noripple(alice)); diff --git a/src/test/core/Coroutine_test.cpp b/src/test/core/Coroutine_test.cpp index 2abac6574..944602784 100644 --- a/src/test/core/Coroutine_test.cpp +++ b/src/test/core/Coroutine_test.cpp @@ -63,17 +63,23 @@ public: { using namespace std::chrono_literals; using namespace jtx; - Env env(*this); - auto& jq = env.app().getJobQueue(); - jq.setThreadCount(0, false); + + testcase("correct order"); + + Env env(*this, envconfig([](std::unique_ptr cfg) { + cfg->FORCE_MULTI_THREAD = true; + return cfg; + })); + gate g1, g2; std::shared_ptr c; - jq.postCoro(jtCLIENT, "Coroutine-Test", [&](auto const& cr) { - c = cr; - g1.signal(); - c->yield(); - g2.signal(); - }); + env.app().getJobQueue().postCoro( + jtCLIENT, "Coroutine-Test", [&](auto const& cr) { + c = cr; + g1.signal(); + c->yield(); + g2.signal(); + }); BEAST_EXPECT(g1.wait_for(5s)); c->join(); c->post(); @@ -85,15 +91,21 @@ public: { using namespace std::chrono_literals; using namespace jtx; - Env env(*this); - auto& jq = env.app().getJobQueue(); - jq.setThreadCount(0, false); + + testcase("incorrect order"); + + Env env(*this, envconfig([](std::unique_ptr cfg) { + cfg->FORCE_MULTI_THREAD = true; + return cfg; + })); + gate g; - jq.postCoro(jtCLIENT, "Coroutine-Test", [&](auto const& c) { - c->post(); - c->yield(); - g.signal(); - }); + env.app().getJobQueue().postCoro( + jtCLIENT, "Coroutine-Test", [&](auto const& c) { + c->post(); + c->yield(); + g.signal(); + }); BEAST_EXPECT(g.wait_for(5s)); } @@ -102,9 +114,12 @@ public: { using namespace std::chrono_literals; using namespace jtx; + + testcase("thread specific storage"); Env env(*this); + auto& jq = env.app().getJobQueue(); - jq.setThreadCount(0, true); + static int const N = 4; std::array, N> a;