From 0c47cfad6f202b62c9ddabb541b126a9c32cf717 Mon Sep 17 00:00:00 2001 From: Nik Bougalis Date: Fri, 12 Nov 2021 22:10:23 -0800 Subject: [PATCH] Adjust priorities of jobs: The priority of different types of jobs was set back in the early days of development, based on insight and observations that don't necessarily apply any longer. Specifically, job types used by the server to sync to the network were being treated as lower priority than client requests, making it more difficult to regain sync. This commit adjusts the priority of several jobs and should allow servers to prioritize resynchronizing to the network over serving clients. --- src/ripple/core/Job.h | 8 +-- src/ripple/core/JobTypeInfo.h | 13 ++-- src/ripple/core/JobTypes.h | 132 ++++++++++++++++------------------ 3 files changed, 70 insertions(+), 83 deletions(-) diff --git a/src/ripple/core/Job.h b/src/ripple/core/Job.h index 15552be70..8f603e8d2 100644 --- a/src/ripple/core/Job.h +++ b/src/ripple/core/Job.h @@ -41,16 +41,17 @@ enum JobType { jtPACK, // Make a fetch pack for a peer jtPUBOLDLEDGER, // An old ledger has been accepted + jtCLIENT, // A websocket command from the client + jtRPC, // A websocket command from the client jtVALIDATION_ut, // A validation from an untrusted source + jtUPDATE_PF, // Update pathfinding requests jtTRANSACTION_l, // A local transaction jtREPLAY_REQ, // Peer request a ledger delta or a skip list jtLEDGER_REQ, // Peer request ledger/txnset data jtPROPOSAL_ut, // A proposal from an untrusted source jtREPLAY_TASK, // A Ledger replay task/subtask jtLEDGER_DATA, // Received data for a ledger we're acquiring - jtCLIENT, // A websocket command from the client - jtRPC, // A websocket command from the client - jtUPDATE_PF, // Update pathfinding requests + jtSWEEP, // Sweep for stale structures jtTRANSACTION, // A transaction received from the network jtMISSING_TXN, // Request missing transactions jtREQUESTED_TXN, // Reply with requested transactions @@ -63,7 +64,6 @@ enum JobType { jtWRITE, // Write out hashed objects jtACCEPT, // Accept a consensus ledger jtPROPOSAL_t, // A proposal from a trusted source - jtSWEEP, // Sweep for stale structures jtNETOP_CLUSTER, // NetworkOPs cluster peer report jtNETOP_TIMER, // NetworkOPs net timer processing jtADMIN, // An administrative operation diff --git a/src/ripple/core/JobTypeInfo.h b/src/ripple/core/JobTypeInfo.h index f4bb79b05..7ed7c4697 100644 --- a/src/ripple/core/JobTypeInfo.h +++ b/src/ripple/core/JobTypeInfo.h @@ -31,11 +31,12 @@ private: JobType const m_type; std::string const m_name; - /** The limit on the number of running jobs for this job type. */ - int const m_limit; + /** The limit on the number of running jobs for this job type. - /** Special jobs are not dispatched via the job queue */ - bool const m_special; + A limit of 0 marks this as a "special job" which is not + dispatched via the job queue. + */ + int const m_limit; /** Average and peak latencies for this job type. 0 is none specified */ std::chrono::milliseconds const m_avgLatency; @@ -49,13 +50,11 @@ public: JobType type, std::string name, int limit, - bool special, std::chrono::milliseconds avgLatency, std::chrono::milliseconds peakLatency) : m_type(type) , m_name(std::move(name)) , m_limit(limit) - , m_special(special) , m_avgLatency(avgLatency) , m_peakLatency(peakLatency) { @@ -82,7 +81,7 @@ public: bool special() const { - return m_special; + return m_limit == 0; } std::chrono::milliseconds diff --git a/src/ripple/core/JobTypes.h b/src/ripple/core/JobTypes.h index 45f69a530..624ba227e 100644 --- a/src/ripple/core/JobTypes.h +++ b/src/ripple/core/JobTypes.h @@ -41,63 +41,73 @@ private: jtINVALID, "invalid", 0, - true, std::chrono::milliseconds{0}, std::chrono::milliseconds{0}) { using namespace std::chrono_literals; int maxLimit = std::numeric_limits::max(); - add(jtPACK, "makeFetchPack", 1, false, 0ms, 0ms); - add(jtPUBOLDLEDGER, "publishAcqLedger", 2, false, 10000ms, 15000ms); - add(jtVALIDATION_ut, - "untrustedValidation", - maxLimit, - false, - 2000ms, - 5000ms); - add(jtTRANSACTION_l, "localTransaction", maxLimit, false, 100ms, 500ms); - add(jtREPLAY_REQ, "ledgerReplayRequest", 10, false, 250ms, 1000ms); - add(jtLEDGER_REQ, "ledgerRequest", 2, false, 0ms, 0ms); - add(jtPROPOSAL_ut, "untrustedProposal", maxLimit, false, 500ms, 1250ms); - add(jtREPLAY_TASK, "ledgerReplayTask", maxLimit, false, 0ms, 0ms); - add(jtLEDGER_DATA, "ledgerData", 2, false, 0ms, 0ms); - add(jtCLIENT, "clientCommand", maxLimit, false, 2000ms, 5000ms); - add(jtRPC, "RPC", maxLimit, false, 0ms, 0ms); - add(jtUPDATE_PF, "updatePaths", maxLimit, false, 0ms, 0ms); - add(jtTRANSACTION, "transaction", maxLimit, false, 250ms, 1000ms); - add(jtBATCH, "batch", maxLimit, false, 250ms, 1000ms); - add(jtADVANCE, "advanceLedger", maxLimit, false, 0ms, 0ms); - add(jtPUBLEDGER, "publishNewLedger", maxLimit, false, 3000ms, 4500ms); - add(jtTXN_DATA, "fetchTxnData", 1, false, 0ms, 0ms); - add(jtWAL, "writeAhead", maxLimit, false, 1000ms, 2500ms); - add(jtVALIDATION_t, - "trustedValidation", - maxLimit, - false, - 500ms, - 1500ms); - add(jtWRITE, "writeObjects", maxLimit, false, 1750ms, 2500ms); - add(jtACCEPT, "acceptLedger", maxLimit, false, 0ms, 0ms); - add(jtPROPOSAL_t, "trustedProposal", maxLimit, false, 100ms, 500ms); - add(jtSWEEP, "sweep", maxLimit, false, 0ms, 0ms); - add(jtNETOP_CLUSTER, "clusterReport", 1, false, 9999ms, 9999ms); - add(jtNETOP_TIMER, "heartbeat", 1, false, 999ms, 999ms); - add(jtADMIN, "administration", maxLimit, false, 0ms, 0ms); - add(jtMISSING_TXN, "handleHaveTransactions", 1200, false, 0ms, 0ms); - add(jtREQUESTED_TXN, "doTransactions", 1200, false, 0ms, 0ms); + auto add = [this]( + JobType jt, + std::string name, + int limit, + std::chrono::milliseconds avgLatency, + std::chrono::milliseconds peakLatency) { + assert(m_map.find(jt) == m_map.end()); - add(jtPEER, "peerCommand", 0, true, 200ms, 2500ms); - add(jtDISK, "diskAccess", 0, true, 500ms, 1000ms); - add(jtTXN_PROC, "processTransaction", 0, true, 0ms, 0ms); - add(jtOB_SETUP, "orderBookSetup", 0, true, 0ms, 0ms); - add(jtPATH_FIND, "pathFind", 0, true, 0ms, 0ms); - add(jtHO_READ, "nodeRead", 0, true, 0ms, 0ms); - add(jtHO_WRITE, "nodeWrite", 0, true, 0ms, 0ms); - add(jtGENERIC, "generic", 0, true, 0ms, 0ms); - add(jtNS_SYNC_READ, "SyncReadNode", 0, true, 0ms, 0ms); - add(jtNS_ASYNC_READ, "AsyncReadNode", 0, true, 0ms, 0ms); - add(jtNS_WRITE, "WriteNode", 0, true, 0ms, 0ms); + auto const [_, inserted] = m_map.emplace( + std::piecewise_construct, + std::forward_as_tuple(jt), + std::forward_as_tuple( + jt, name, limit, avgLatency, peakLatency)); + + assert(inserted == true); + (void)_; + (void)inserted; + }; + + // clang-format off + add(jtPACK, "makeFetchPack", 1, 0ms, 0ms); + add(jtPUBOLDLEDGER, "publishAcqLedger", 2, 10000ms, 15000ms); + add(jtVALIDATION_ut, "untrustedValidation", maxLimit, 2000ms, 5000ms); + add(jtTRANSACTION_l, "localTransaction", maxLimit, 100ms, 500ms); + add(jtREPLAY_REQ, "ledgerReplayRequest", 10, 250ms, 1000ms); + add(jtLEDGER_REQ, "ledgerRequest", 4, 0ms, 0ms); + add(jtPROPOSAL_ut, "untrustedProposal", maxLimit, 500ms, 1250ms); + add(jtREPLAY_TASK, "ledgerReplayTask", maxLimit, 0ms, 0ms); + add(jtLEDGER_DATA, "ledgerData", 4, 0ms, 0ms); + add(jtCLIENT, "clientCommand", maxLimit, 2000ms, 5000ms); + add(jtRPC, "RPC", maxLimit, 0ms, 0ms); + add(jtUPDATE_PF, "updatePaths", 1, 0ms, 0ms); + add(jtTRANSACTION, "transaction", maxLimit, 250ms, 1000ms); + add(jtBATCH, "batch", maxLimit, 250ms, 1000ms); + add(jtADVANCE, "advanceLedger", maxLimit, 0ms, 0ms); + add(jtPUBLEDGER, "publishNewLedger", maxLimit, 3000ms, 4500ms); + add(jtTXN_DATA, "fetchTxnData", 5, 0ms, 0ms); + add(jtWAL, "writeAhead", maxLimit, 1000ms, 2500ms); + add(jtVALIDATION_t, "trustedValidation", maxLimit, 500ms, 1500ms); + add(jtWRITE, "writeObjects", maxLimit, 1750ms, 2500ms); + add(jtACCEPT, "acceptLedger", maxLimit, 0ms, 0ms); + add(jtPROPOSAL_t, "trustedProposal", maxLimit, 100ms, 500ms); + add(jtSWEEP, "sweep", 1, 0ms, 0ms); + add(jtNETOP_CLUSTER, "clusterReport", 1, 9999ms, 9999ms); + add(jtNETOP_TIMER, "heartbeat", 1, 999ms, 999ms); + add(jtADMIN, "administration", maxLimit, 0ms, 0ms); + add(jtMISSING_TXN, "handleHaveTransactions", 1200, 0ms, 0ms); + add(jtREQUESTED_TXN, "doTransactions", 1200, 0ms, 0ms); + + add(jtPEER, "peerCommand", 0, 200ms, 2500ms); + add(jtDISK, "diskAccess", 0, 500ms, 1000ms); + add(jtTXN_PROC, "processTransaction", 0, 0ms, 0ms); + add(jtOB_SETUP, "orderBookSetup", 0, 0ms, 0ms); + add(jtPATH_FIND, "pathFind", 0, 0ms, 0ms); + add(jtHO_READ, "nodeRead", 0, 0ms, 0ms); + add(jtHO_WRITE, "nodeWrite", 0, 0ms, 0ms); + add(jtGENERIC, "generic", 0, 0ms, 0ms); + add(jtNS_SYNC_READ, "SyncReadNode", 0, 0ms, 0ms); + add(jtNS_ASYNC_READ, "AsyncReadNode", 0, 0ms, 0ms); + add(jtNS_WRITE, "WriteNode", 0, 0ms, 0ms); + // clang-format on } public: @@ -162,28 +172,6 @@ public: return m_map.cend(); } -private: - void - add(JobType jt, - std::string name, - int limit, - bool special, - std::chrono::milliseconds avgLatency, - std::chrono::milliseconds peakLatency) - { - assert(m_map.find(jt) == m_map.end()); - - auto const [_, inserted] = m_map.emplace( - std::piecewise_construct, - std::forward_as_tuple(jt), - std::forward_as_tuple( - jt, name, limit, special, avgLatency, peakLatency)); - - assert(inserted == true); - (void)_; - (void)inserted; - } - JobTypeInfo m_unknown; Map m_map; };