Adjust priorities of jobs:

The priority of different types of jobs was set back in the early
days of development, based on insight and observations that don't
necessarily apply any longer.

Specifically, job types used by the server to sync to the network
were being treated as lower priority than client requests, making
it more difficult to regain sync.

This commit adjusts the priority of several jobs and should allow
servers to prioritize resynchronizing to the network over serving
clients.
This commit is contained in:
Nik Bougalis
2021-11-12 22:10:23 -08:00
parent eb6b79bed7
commit 0c47cfad6f
3 changed files with 70 additions and 83 deletions

View File

@@ -41,16 +41,17 @@ enum JobType {
jtPACK, // Make a fetch pack for a peer jtPACK, // Make a fetch pack for a peer
jtPUBOLDLEDGER, // An old ledger has been accepted jtPUBOLDLEDGER, // An old ledger has been accepted
jtCLIENT, // A websocket command from the client
jtRPC, // A websocket command from the client
jtVALIDATION_ut, // A validation from an untrusted source jtVALIDATION_ut, // A validation from an untrusted source
jtUPDATE_PF, // Update pathfinding requests
jtTRANSACTION_l, // A local transaction jtTRANSACTION_l, // A local transaction
jtREPLAY_REQ, // Peer request a ledger delta or a skip list jtREPLAY_REQ, // Peer request a ledger delta or a skip list
jtLEDGER_REQ, // Peer request ledger/txnset data jtLEDGER_REQ, // Peer request ledger/txnset data
jtPROPOSAL_ut, // A proposal from an untrusted source jtPROPOSAL_ut, // A proposal from an untrusted source
jtREPLAY_TASK, // A Ledger replay task/subtask jtREPLAY_TASK, // A Ledger replay task/subtask
jtLEDGER_DATA, // Received data for a ledger we're acquiring jtLEDGER_DATA, // Received data for a ledger we're acquiring
jtCLIENT, // A websocket command from the client jtSWEEP, // Sweep for stale structures
jtRPC, // A websocket command from the client
jtUPDATE_PF, // Update pathfinding requests
jtTRANSACTION, // A transaction received from the network jtTRANSACTION, // A transaction received from the network
jtMISSING_TXN, // Request missing transactions jtMISSING_TXN, // Request missing transactions
jtREQUESTED_TXN, // Reply with requested transactions jtREQUESTED_TXN, // Reply with requested transactions
@@ -63,7 +64,6 @@ enum JobType {
jtWRITE, // Write out hashed objects jtWRITE, // Write out hashed objects
jtACCEPT, // Accept a consensus ledger jtACCEPT, // Accept a consensus ledger
jtPROPOSAL_t, // A proposal from a trusted source jtPROPOSAL_t, // A proposal from a trusted source
jtSWEEP, // Sweep for stale structures
jtNETOP_CLUSTER, // NetworkOPs cluster peer report jtNETOP_CLUSTER, // NetworkOPs cluster peer report
jtNETOP_TIMER, // NetworkOPs net timer processing jtNETOP_TIMER, // NetworkOPs net timer processing
jtADMIN, // An administrative operation jtADMIN, // An administrative operation

View File

@@ -31,11 +31,12 @@ private:
JobType const m_type; JobType const m_type;
std::string const m_name; std::string const m_name;
/** The limit on the number of running jobs for this job type. */ /** The limit on the number of running jobs for this job type.
int const m_limit;
/** Special jobs are not dispatched via the job queue */ A limit of 0 marks this as a "special job" which is not
bool const m_special; dispatched via the job queue.
*/
int const m_limit;
/** Average and peak latencies for this job type. 0 is none specified */ /** Average and peak latencies for this job type. 0 is none specified */
std::chrono::milliseconds const m_avgLatency; std::chrono::milliseconds const m_avgLatency;
@@ -49,13 +50,11 @@ public:
JobType type, JobType type,
std::string name, std::string name,
int limit, int limit,
bool special,
std::chrono::milliseconds avgLatency, std::chrono::milliseconds avgLatency,
std::chrono::milliseconds peakLatency) std::chrono::milliseconds peakLatency)
: m_type(type) : m_type(type)
, m_name(std::move(name)) , m_name(std::move(name))
, m_limit(limit) , m_limit(limit)
, m_special(special)
, m_avgLatency(avgLatency) , m_avgLatency(avgLatency)
, m_peakLatency(peakLatency) , m_peakLatency(peakLatency)
{ {
@@ -82,7 +81,7 @@ public:
bool bool
special() const special() const
{ {
return m_special; return m_limit == 0;
} }
std::chrono::milliseconds std::chrono::milliseconds

View File

@@ -41,63 +41,73 @@ private:
jtINVALID, jtINVALID,
"invalid", "invalid",
0, 0,
true,
std::chrono::milliseconds{0}, std::chrono::milliseconds{0},
std::chrono::milliseconds{0}) std::chrono::milliseconds{0})
{ {
using namespace std::chrono_literals; using namespace std::chrono_literals;
int maxLimit = std::numeric_limits<int>::max(); int maxLimit = std::numeric_limits<int>::max();
add(jtPACK, "makeFetchPack", 1, false, 0ms, 0ms); auto add = [this](
add(jtPUBOLDLEDGER, "publishAcqLedger", 2, false, 10000ms, 15000ms); JobType jt,
add(jtVALIDATION_ut, std::string name,
"untrustedValidation", int limit,
maxLimit, std::chrono::milliseconds avgLatency,
false, std::chrono::milliseconds peakLatency) {
2000ms, assert(m_map.find(jt) == m_map.end());
5000ms);
add(jtTRANSACTION_l, "localTransaction", maxLimit, false, 100ms, 500ms);
add(jtREPLAY_REQ, "ledgerReplayRequest", 10, false, 250ms, 1000ms);
add(jtLEDGER_REQ, "ledgerRequest", 2, false, 0ms, 0ms);
add(jtPROPOSAL_ut, "untrustedProposal", maxLimit, false, 500ms, 1250ms);
add(jtREPLAY_TASK, "ledgerReplayTask", maxLimit, false, 0ms, 0ms);
add(jtLEDGER_DATA, "ledgerData", 2, false, 0ms, 0ms);
add(jtCLIENT, "clientCommand", maxLimit, false, 2000ms, 5000ms);
add(jtRPC, "RPC", maxLimit, false, 0ms, 0ms);
add(jtUPDATE_PF, "updatePaths", maxLimit, false, 0ms, 0ms);
add(jtTRANSACTION, "transaction", maxLimit, false, 250ms, 1000ms);
add(jtBATCH, "batch", maxLimit, false, 250ms, 1000ms);
add(jtADVANCE, "advanceLedger", maxLimit, false, 0ms, 0ms);
add(jtPUBLEDGER, "publishNewLedger", maxLimit, false, 3000ms, 4500ms);
add(jtTXN_DATA, "fetchTxnData", 1, false, 0ms, 0ms);
add(jtWAL, "writeAhead", maxLimit, false, 1000ms, 2500ms);
add(jtVALIDATION_t,
"trustedValidation",
maxLimit,
false,
500ms,
1500ms);
add(jtWRITE, "writeObjects", maxLimit, false, 1750ms, 2500ms);
add(jtACCEPT, "acceptLedger", maxLimit, false, 0ms, 0ms);
add(jtPROPOSAL_t, "trustedProposal", maxLimit, false, 100ms, 500ms);
add(jtSWEEP, "sweep", maxLimit, false, 0ms, 0ms);
add(jtNETOP_CLUSTER, "clusterReport", 1, false, 9999ms, 9999ms);
add(jtNETOP_TIMER, "heartbeat", 1, false, 999ms, 999ms);
add(jtADMIN, "administration", maxLimit, false, 0ms, 0ms);
add(jtMISSING_TXN, "handleHaveTransactions", 1200, false, 0ms, 0ms);
add(jtREQUESTED_TXN, "doTransactions", 1200, false, 0ms, 0ms);
add(jtPEER, "peerCommand", 0, true, 200ms, 2500ms); auto const [_, inserted] = m_map.emplace(
add(jtDISK, "diskAccess", 0, true, 500ms, 1000ms); std::piecewise_construct,
add(jtTXN_PROC, "processTransaction", 0, true, 0ms, 0ms); std::forward_as_tuple(jt),
add(jtOB_SETUP, "orderBookSetup", 0, true, 0ms, 0ms); std::forward_as_tuple(
add(jtPATH_FIND, "pathFind", 0, true, 0ms, 0ms); jt, name, limit, avgLatency, peakLatency));
add(jtHO_READ, "nodeRead", 0, true, 0ms, 0ms);
add(jtHO_WRITE, "nodeWrite", 0, true, 0ms, 0ms); assert(inserted == true);
add(jtGENERIC, "generic", 0, true, 0ms, 0ms); (void)_;
add(jtNS_SYNC_READ, "SyncReadNode", 0, true, 0ms, 0ms); (void)inserted;
add(jtNS_ASYNC_READ, "AsyncReadNode", 0, true, 0ms, 0ms); };
add(jtNS_WRITE, "WriteNode", 0, true, 0ms, 0ms);
// clang-format off
add(jtPACK, "makeFetchPack", 1, 0ms, 0ms);
add(jtPUBOLDLEDGER, "publishAcqLedger", 2, 10000ms, 15000ms);
add(jtVALIDATION_ut, "untrustedValidation", maxLimit, 2000ms, 5000ms);
add(jtTRANSACTION_l, "localTransaction", maxLimit, 100ms, 500ms);
add(jtREPLAY_REQ, "ledgerReplayRequest", 10, 250ms, 1000ms);
add(jtLEDGER_REQ, "ledgerRequest", 4, 0ms, 0ms);
add(jtPROPOSAL_ut, "untrustedProposal", maxLimit, 500ms, 1250ms);
add(jtREPLAY_TASK, "ledgerReplayTask", maxLimit, 0ms, 0ms);
add(jtLEDGER_DATA, "ledgerData", 4, 0ms, 0ms);
add(jtCLIENT, "clientCommand", maxLimit, 2000ms, 5000ms);
add(jtRPC, "RPC", maxLimit, 0ms, 0ms);
add(jtUPDATE_PF, "updatePaths", 1, 0ms, 0ms);
add(jtTRANSACTION, "transaction", maxLimit, 250ms, 1000ms);
add(jtBATCH, "batch", maxLimit, 250ms, 1000ms);
add(jtADVANCE, "advanceLedger", maxLimit, 0ms, 0ms);
add(jtPUBLEDGER, "publishNewLedger", maxLimit, 3000ms, 4500ms);
add(jtTXN_DATA, "fetchTxnData", 5, 0ms, 0ms);
add(jtWAL, "writeAhead", maxLimit, 1000ms, 2500ms);
add(jtVALIDATION_t, "trustedValidation", maxLimit, 500ms, 1500ms);
add(jtWRITE, "writeObjects", maxLimit, 1750ms, 2500ms);
add(jtACCEPT, "acceptLedger", maxLimit, 0ms, 0ms);
add(jtPROPOSAL_t, "trustedProposal", maxLimit, 100ms, 500ms);
add(jtSWEEP, "sweep", 1, 0ms, 0ms);
add(jtNETOP_CLUSTER, "clusterReport", 1, 9999ms, 9999ms);
add(jtNETOP_TIMER, "heartbeat", 1, 999ms, 999ms);
add(jtADMIN, "administration", maxLimit, 0ms, 0ms);
add(jtMISSING_TXN, "handleHaveTransactions", 1200, 0ms, 0ms);
add(jtREQUESTED_TXN, "doTransactions", 1200, 0ms, 0ms);
add(jtPEER, "peerCommand", 0, 200ms, 2500ms);
add(jtDISK, "diskAccess", 0, 500ms, 1000ms);
add(jtTXN_PROC, "processTransaction", 0, 0ms, 0ms);
add(jtOB_SETUP, "orderBookSetup", 0, 0ms, 0ms);
add(jtPATH_FIND, "pathFind", 0, 0ms, 0ms);
add(jtHO_READ, "nodeRead", 0, 0ms, 0ms);
add(jtHO_WRITE, "nodeWrite", 0, 0ms, 0ms);
add(jtGENERIC, "generic", 0, 0ms, 0ms);
add(jtNS_SYNC_READ, "SyncReadNode", 0, 0ms, 0ms);
add(jtNS_ASYNC_READ, "AsyncReadNode", 0, 0ms, 0ms);
add(jtNS_WRITE, "WriteNode", 0, 0ms, 0ms);
// clang-format on
} }
public: public:
@@ -162,28 +172,6 @@ public:
return m_map.cend(); return m_map.cend();
} }
private:
void
add(JobType jt,
std::string name,
int limit,
bool special,
std::chrono::milliseconds avgLatency,
std::chrono::milliseconds peakLatency)
{
assert(m_map.find(jt) == m_map.end());
auto const [_, inserted] = m_map.emplace(
std::piecewise_construct,
std::forward_as_tuple(jt),
std::forward_as_tuple(
jt, name, limit, special, avgLatency, peakLatency));
assert(inserted == true);
(void)_;
(void)inserted;
}
JobTypeInfo m_unknown; JobTypeInfo m_unknown;
Map m_map; Map m_map;
}; };