diff --git a/src/ripple/app/consensus/RCLValidations.cpp b/src/ripple/app/consensus/RCLValidations.cpp index b22b73c4e..415f754f5 100644 --- a/src/ripple/app/consensus/RCLValidations.cpp +++ b/src/ripple/app/consensus/RCLValidations.cpp @@ -87,7 +87,8 @@ RCLValidatedLedger::operator[](Seq const& s) const -> ID JLOG(j_.warn()) << "Unable to determine hash of ancestor seq=" << s << " from ledger hash=" << ledgerID_ - << " seq=" << ledgerSeq_; + << " seq=" << ledgerSeq_ << " (available: " << minSeq() + << "-" << seq() << ")"; // Default ID that is less than all others return ID{0}; } diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index f8f1690f2..6dad8f240 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -283,6 +283,29 @@ public: logs_->journal("Collector"))) , m_jobQueue(std::make_unique( + [](std::unique_ptr const& config) { + if (config->standalone() && !config->reporting() && + !config->FORCE_MULTI_THREAD) + return 1; + + if (config->WORKERS) + return config->WORKERS; + + auto count = + static_cast(std::thread::hardware_concurrency()); + + // Be more aggressive about the number of threads to use + // for the job queue if the server is configured as "large" + // or "huge" if there are enough cores. + if (config->NODE_SIZE >= 4 && count >= 16) + count = 6 + std::min(count, 8); + else if (config->NODE_SIZE >= 3 && count >= 8) + count = 4 + std::min(count, 6); + else + count = 2 + std::min(count, 4); + + return count; + }(config_), m_collectorManager->group("jobq"), logs_->journal("JobQueue"), *logs_, @@ -305,6 +328,7 @@ public: logs_->journal("TaggedCache")) , cachedSLEs_(std::chrono::minutes(1), stopwatch()) + , validatorKeys_(*config_, m_journal) , m_resourceManager(Resource::make_Manager( @@ -1222,9 +1246,6 @@ ApplicationImp::setup() // Optionally turn off logging to console. logs_->silent(config_->silent()); - m_jobQueue->setThreadCount( - config_->WORKERS, config_->standalone() && !config_->reporting()); - if (!config_->standalone()) timeKeeper_->run(config_->SNTP_SERVERS); diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h index ea5044c2d..bd4957584 100644 --- a/src/ripple/core/Config.h +++ b/src/ripple/core/Config.h @@ -202,7 +202,9 @@ public: std::chrono::seconds AMENDMENT_MAJORITY_TIME = defaultAmendmentMajorityTime; // Thread pool configuration - std::size_t WORKERS = 0; + int WORKERS = 0; + // Can only be set in code, specifically unit tests + bool FORCE_MULTI_THREAD = false; // Normally the sweep timer is automatically deduced based on the node // size, but we allow admins to explicitly set it in the config. diff --git a/src/ripple/core/JobQueue.h b/src/ripple/core/JobQueue.h index 1d3fd5498..a93d68b85 100644 --- a/src/ripple/core/JobQueue.h +++ b/src/ripple/core/JobQueue.h @@ -141,6 +141,7 @@ public: using JobFunction = std::function; JobQueue( + int threadCount, beast::insight::Collector::ptr const& collector, beast::Journal journal, Logs& logs, @@ -200,11 +201,6 @@ public: int getJobCountGE(JobType t) const; - /** Set the number of thread serving the job queue to precisely this number. - */ - void - setThreadCount(int c, bool const standaloneMode); - /** Return a scoped LoadEvent. */ std::unique_ptr diff --git a/src/ripple/core/impl/Config.cpp b/src/ripple/core/impl/Config.cpp index b5a9677e2..e588553dd 100644 --- a/src/ripple/core/impl/Config.cpp +++ b/src/ripple/core/impl/Config.cpp @@ -637,7 +637,14 @@ Config::loadFromString(std::string const& fileContents) } if (getSingleSection(secConfig, SECTION_WORKERS, strTemp, j_)) - WORKERS = beast::lexicalCastThrow(strTemp); + { + WORKERS = beast::lexicalCastThrow(strTemp); + + if (WORKERS < 1 || WORKERS > 1024) + Throw( + "Invalid " SECTION_WORKERS + ": must be between 1 and 1024 inclusive."); + } if (getSingleSection(secConfig, SECTION_COMPRESSION, strTemp, j_)) COMPRESSION = beast::lexicalCastThrow(strTemp); diff --git a/src/ripple/core/impl/JobQueue.cpp b/src/ripple/core/impl/JobQueue.cpp index 4fd394994..d87664416 100644 --- a/src/ripple/core/impl/JobQueue.cpp +++ b/src/ripple/core/impl/JobQueue.cpp @@ -25,6 +25,7 @@ namespace ripple { JobQueue::JobQueue( + int threadCount, beast::insight::Collector::ptr const& collector, beast::Journal journal, Logs& logs, @@ -33,11 +34,13 @@ JobQueue::JobQueue( , m_lastJob(0) , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs) , m_processCount(0) - , m_workers(*this, &perfLog, "JobQueue", 0) + , m_workers(*this, &perfLog, "JobQueue", threadCount) , m_cancelCallback(std::bind(&JobQueue::isStopping, this)) , perfLog_(perfLog) , m_collector(collector) { + JLOG(m_journal.info()) << "Using " << threadCount << " threads"; + hook = m_collector->make_hook(std::bind(&JobQueue::collect, this)); job_count = m_collector->make_gauge("job_count"); @@ -139,29 +142,6 @@ JobQueue::getJobCountGE(JobType t) const return ret; } -void -JobQueue::setThreadCount(int c, bool const standaloneMode) -{ - if (standaloneMode) - { - c = 1; - } - else if (c == 0) - { - c = static_cast(std::thread::hardware_concurrency()); - c = 2 + std::min(c, 4); // I/O will bottleneck - JLOG(m_journal.info()) << "Auto-tuning to " << c - << " validation/transaction/proposal threads."; - } - else - { - JLOG(m_journal.info()) << "Configured " << c - << " validation/transaction/proposal threads."; - } - - m_workers.setNumberOfThreads(c); -} - std::unique_ptr JobQueue::makeLoadEvent(JobType t, std::string const& name) { diff --git a/src/test/app/Transaction_ordering_test.cpp b/src/test/app/Transaction_ordering_test.cpp index d292c8906..0353df906 100644 --- a/src/test/app/Transaction_ordering_test.cpp +++ b/src/test/app/Transaction_ordering_test.cpp @@ -28,6 +28,7 @@ struct Transaction_ordering_test : public beast::unit_test::suite testCorrectOrder() { using namespace jtx; + testcase("Correct Order"); Env env(*this); auto const alice = Account("alice"); @@ -69,8 +70,13 @@ struct Transaction_ordering_test : public beast::unit_test::suite { using namespace jtx; - Env env(*this); - env.app().getJobQueue().setThreadCount(0, false); + testcase("Incorrect order"); + + Env env(*this, envconfig([](std::unique_ptr cfg) { + cfg->FORCE_MULTI_THREAD = false; + return cfg; + })); + auto const alice = Account("alice"); env.fund(XRP(1000), noripple(alice)); @@ -109,8 +115,13 @@ struct Transaction_ordering_test : public beast::unit_test::suite { using namespace jtx; - Env env(*this); - env.app().getJobQueue().setThreadCount(0, false); + testcase("Incorrect order multiple intermediaries"); + + Env env(*this, envconfig([](std::unique_ptr cfg) { + cfg->FORCE_MULTI_THREAD = true; + return cfg; + })); + auto const alice = Account("alice"); env.fund(XRP(1000), noripple(alice)); diff --git a/src/test/core/Coroutine_test.cpp b/src/test/core/Coroutine_test.cpp index 2abac6574..944602784 100644 --- a/src/test/core/Coroutine_test.cpp +++ b/src/test/core/Coroutine_test.cpp @@ -63,17 +63,23 @@ public: { using namespace std::chrono_literals; using namespace jtx; - Env env(*this); - auto& jq = env.app().getJobQueue(); - jq.setThreadCount(0, false); + + testcase("correct order"); + + Env env(*this, envconfig([](std::unique_ptr cfg) { + cfg->FORCE_MULTI_THREAD = true; + return cfg; + })); + gate g1, g2; std::shared_ptr c; - jq.postCoro(jtCLIENT, "Coroutine-Test", [&](auto const& cr) { - c = cr; - g1.signal(); - c->yield(); - g2.signal(); - }); + env.app().getJobQueue().postCoro( + jtCLIENT, "Coroutine-Test", [&](auto const& cr) { + c = cr; + g1.signal(); + c->yield(); + g2.signal(); + }); BEAST_EXPECT(g1.wait_for(5s)); c->join(); c->post(); @@ -85,15 +91,21 @@ public: { using namespace std::chrono_literals; using namespace jtx; - Env env(*this); - auto& jq = env.app().getJobQueue(); - jq.setThreadCount(0, false); + + testcase("incorrect order"); + + Env env(*this, envconfig([](std::unique_ptr cfg) { + cfg->FORCE_MULTI_THREAD = true; + return cfg; + })); + gate g; - jq.postCoro(jtCLIENT, "Coroutine-Test", [&](auto const& c) { - c->post(); - c->yield(); - g.signal(); - }); + env.app().getJobQueue().postCoro( + jtCLIENT, "Coroutine-Test", [&](auto const& c) { + c->post(); + c->yield(); + g.signal(); + }); BEAST_EXPECT(g.wait_for(5s)); } @@ -102,9 +114,12 @@ public: { using namespace std::chrono_literals; using namespace jtx; + + testcase("thread specific storage"); Env env(*this); + auto& jq = env.app().getJobQueue(); - jq.setThreadCount(0, true); + static int const N = 4; std::array, N> a;