From e1b1382599f4a1387b300faf57df6a360fc05670 Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Fri, 5 Feb 2021 14:54:42 +0530 Subject: [PATCH] Read request configuration options. (#236) * Ignores read requests when contract execution disabled. * Added concurrent_read_reqeuests config. * Improved metric test script. --- src/conf.cpp | 9 +++ src/conf.hpp | 3 + src/usr/read_req.cpp | 5 +- src/usr/usr.cpp | 4 ++ test/local-cluster/cluster-create.sh | 53 ++++++++-------- test/metrics/metrics.js | 91 +++++++++++++++++++--------- 6 files changed, 107 insertions(+), 58 deletions(-) diff --git a/src/conf.cpp b/src/conf.cpp index 35cdadaa..24692030 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -400,6 +400,7 @@ namespace conf cfg.user.max_bytes_per_min = user["max_bytes_per_min"].as(); cfg.user.max_bad_msgs_per_min = user["max_bad_msgs_per_min"].as(); cfg.user.idle_timeout = user["idle_timeout"].as(); + cfg.user.concurrent_read_reqeuests = user["concurrent_read_reqeuests"].as(); cfg.user.enabled = user["enabled"].as(); } catch (const std::exception &e) @@ -501,6 +502,7 @@ namespace conf user_config.insert_or_assign("max_connections", cfg.user.max_connections); user_config.insert_or_assign("max_in_connections_per_host", cfg.user.max_in_connections_per_host); user_config.insert_or_assign("enabled", cfg.user.enabled); + user_config.insert_or_assign("concurrent_read_reqeuests", cfg.user.concurrent_read_reqeuests); d.insert_or_assign("user", user_config); } @@ -556,6 +558,13 @@ namespace conf return -1; } + // User settings + if (cfg.user.concurrent_read_reqeuests > CONCURRENT_READ_REQUEST_MAX_LIMIT) + { + std::cerr << "User concurrent_read_reqeuests cannot exceed " << CONCURRENT_READ_REQUEST_MAX_LIMIT << "\n"; + return -1; + } + // Log settings const std::unordered_set valid_loglevels({"dbg", "inf", "wrn", "err"}); if (valid_loglevels.count(cfg.log.loglevel) != 1) diff --git a/src/conf.hpp b/src/conf.hpp index f318d06b..d92340e6 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -9,6 +9,8 @@ */ namespace conf { + constexpr size_t CONCURRENT_READ_REQUEST_MAX_LIMIT = 32; + // Struct to represent ip and port of the peer. struct ip_port_prop { @@ -109,6 +111,7 @@ namespace conf uint64_t max_bad_msgs_per_min = 0; // User bad messages per minute uint16_t max_connections = 0; // Max inbound user connections uint16_t max_in_connections_per_host = 0; // Max inbound user connections per remote host (IP). + uint64_t concurrent_read_reqeuests = 10; // Supported concurrent read requests count. bool enabled = true; // User connections enable/disable. }; diff --git a/src/usr/read_req.cpp b/src/usr/read_req.cpp index bb301223..f4d6e36a 100644 --- a/src/usr/read_req.cpp +++ b/src/usr/read_req.cpp @@ -14,7 +14,6 @@ namespace read_req { constexpr uint16_t LOOP_WAIT = 100; // Milliseconds. constexpr uint16_t MAX_QUEUE_SIZE = 64; // Maximum read request queue size, The size passed is rounded up to the next multiple of the block size (32). - constexpr uint16_t MAX_THREAD_CAP = 5; // Maximum number of read request processing threads. bool is_shutting_down = false; bool init_success = false; @@ -22,7 +21,7 @@ namespace read_req util::buffer_store read_req_store; std::thread thread_pool_executor; // Thread which spawns new threads for the read requests is the queue. std::vector read_req_threads; - moodycamel::ConcurrentQueue read_req_queue(MAX_QUEUE_SIZE, 0, MAX_THREAD_CAP); + moodycamel::ConcurrentQueue read_req_queue(MAX_QUEUE_SIZE, 0, conf::CONCURRENT_READ_REQUEST_MAX_LIMIT); std::mutex execution_contexts_mutex; std::list execution_contexts; std::mutex completed_threads_mutex; @@ -91,7 +90,7 @@ namespace read_req } } - if (read_req_queue.size_approx() != 0 && read_req_threads.size() <= MAX_THREAD_CAP) + if (read_req_queue.size_approx() != 0 && read_req_threads.size() <= conf::cfg.user.concurrent_read_reqeuests) { read_req_threads.push_back(std::thread(read_request_processor)); if (read_req_queue.size_approx() == 1) diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 18555dd1..1bc4d2ff 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -144,6 +144,10 @@ namespace usr if (msg_type == msg::usrmsg::MSGTYPE_CONTRACT_READ_REQUEST) { + // Ignore the request if contract execution is disabled or read requests disallowed. + if (!conf::cfg.contract.execute || conf::cfg.user.concurrent_read_reqeuests == 0) + return 0; + std::string content; if (parser.extract_read_request(content) != -1) { diff --git a/test/local-cluster/cluster-create.sh b/test/local-cluster/cluster-create.sh index 8bf3aa88..6343beaf 100755 --- a/test/local-cluster/cluster-create.sh +++ b/test/local-cluster/cluster-create.sh @@ -84,36 +84,39 @@ do # Update config. contract_json=$(node -p "JSON.stringify({...require('./tmp.json').contract, - id: '3c349abe-4d70-4f50-9fa6-018f1f2530ab', \ - bin_path: '$binary', \ - bin_args: '$binargs', \ - roundtime: $roundtime, \ - consensus: 'public', \ - npl: 'public', \ - appbill: { \ - mode: '', \ - bin_args: '' \ - },}, null, 2)") + id: '3c349abe-4d70-4f50-9fa6-018f1f2530ab', \ + bin_path: '$binary', \ + bin_args: '$binargs', \ + roundtime: $roundtime, \ + consensus: 'public', \ + npl: 'public', \ + appbill: { \ + mode: '', \ + bin_args: '' \ + } + }, null, 2)") mesh_json=$(node -p "JSON.stringify({...require('./tmp.json').mesh, \ - port:${peerport}, \ - peer_discovery: { \ - enabled: true, \ - interval: 10000 \ - } + port:${peerport}, \ + peer_discovery: { \ + enabled: true, \ + interval: 10000 \ + } + }, null, 2)") + user_json=$(node -p "JSON.stringify({...require('./tmp.json').user, \ + port:${pubport} + }, null, 2)") + + log_json=$(node -p "JSON.stringify({...require('./tmp.json').log, \ + loglevel: '$loglevel', \ + loggers:['console', 'file'] \ }, null, 2)") - user_json=$(node -p "JSON.stringify({...require('./tmp.json').user, port:${pubport}}, null, 2)") node -p "JSON.stringify({...require('./tmp.json'), \ - contract: ${contract_json},\ - mesh: ${mesh_json},\ - user: ${user_json}, \ - log: {\ - loglevel: '$loglevel', \ - max_mbytes_per_file: 10, \ - max_file_count": 50, \ - loggers:['console', 'file'] \ - }\ + contract: ${contract_json},\ + mesh: ${mesh_json},\ + user: ${user_json}, \ + log: ${log_json}, \ }, null, 2)" > hp.cfg rm tmp.json diff --git a/test/metrics/metrics.js b/test/metrics/metrics.js index a2273d47..61060a0e 100644 --- a/test/metrics/metrics.js +++ b/test/metrics/metrics.js @@ -13,23 +13,32 @@ async function main() { HotPocket.setLogLevel(1); const tests = { - "Large payload": largePayload, - "Single user read requests": singleUserReadRequests, - "Single user Input/Output": singleUserInputOutput, - "Multi user read requests": multiUserReadRequests, - "Multi user Input/Output": multiUserInputOutput, + "Large payload": () => largePayload(2), + "Single user read requests": () => singleUserReadRequests(10, 10), + "Single user Input/Output": () => singleUserInputOutput(10, 10), + "Multi user read requests": () => multiUserReadRequests(10, 10, 10), + "Multi user Input/Output": () => multiUserInputOutput(10, 10, 10), }; + // Execute all tests. for (const test in tests) { console.log(test + "..."); - const start = new Date(); - await tests[test](); + // The test will return single or multiple tuples of time periods. + // Multiple tuples mean that the test had multiple sub-tests inside it. + // Each tuple indicates [start time, end time] of a particular atomic test. + const result = await tests[test](); - const end = new Date(); - const duration = end.getTime() - start.getTime(); + // If the result is a single period tuple, put them in a parent array. + const runPeriods = Array.isArray(result[0]) ? result : [result]; + // Duration is calculated as the duration between earliest start time and latest end time. + const startTimes = runPeriods.map(p => p[0]); + const endTimes = runPeriods.map(p => p[1]); + const minStartTime = Math.min.apply(null, startTimes); + const maxEndTime = Math.max.apply(null, endTimes); + const duration = maxEndTime - minStartTime; console.log(duration + "ms"); } @@ -53,42 +62,50 @@ async function createClient() { return hpc; } -function singleUserReadRequests() { +function singleUserReadRequests(payloadKB, requestCount) { return new Promise(async resolve => { - const payload = "A".repeat(10 * 1024); - const requestCount = 10; + const payload = "A".repeat(payloadKB * 1024); let respCount = 0; const hpc = await createClient(); + const timer = new Timer(); + hpc.on(HotPocket.events.contractReadResponse, (response) => { respCount++; - if (respCount == requestCount) - hpc.close().then(() => resolve()); + if (respCount == requestCount) { + const runPeriod = timer.stop(); + hpc.close().then(() => resolve(runPeriod)); + } }); + timer.start(); for (let i = 0; i < requestCount; i++) { hpc.sendContractReadRequest(payload); } }) } -function singleUserInputOutput() { +function singleUserInputOutput(payloadKB, requestCount) { return new Promise(async (resolve) => { - const payload = "A".repeat(10 * 1024); - const requestCount = 10; + const payload = "A".repeat(payloadKB * 1024); let respCount = 0; const hpc = await createClient(); + const timer = new Timer(); + hpc.on(HotPocket.events.contractOutput, (response) => { respCount++; - if (respCount == requestCount) - hpc.close().then(() => resolve()); + if (respCount == requestCount) { + const runPeriod = timer.stop(); + hpc.close().then(() => resolve(runPeriod)); + } }); + timer.start(); for (let i = 0; i < requestCount; i++) { const nonce = i.toString().padStart(5); await hpc.sendContractInput(payload, nonce, 20); @@ -96,42 +113,56 @@ function singleUserInputOutput() { }) } -function multiUserReadRequests() { +function multiUserReadRequests(payloadKB, requestCountPerUser, userCount) { - const userCount = 10; const tasks = []; - for (let i = 0; i < userCount; i++) { - tasks.push(singleUserReadRequests()); + tasks.push(singleUserReadRequests(payloadKB, requestCountPerUser)); } return Promise.all(tasks); } -function multiUserInputOutput() { +function multiUserInputOutput(payloadKB, requestCountPerUser, userCount) { - const userCount = 10; const tasks = []; - for (let i = 0; i < userCount; i++) { - tasks.push(singleUserInputOutput()); + tasks.push(singleUserInputOutput(payloadKB, requestCountPerUser)); } return Promise.all(tasks); } -function largePayload() { +function largePayload(payloadMB) { return new Promise(async (resolve) => { - const payload = "A".repeat(2 * 1024 * 1024); + const payload = "A".repeat(payloadMB * 1024 * 1024); const hpc = await createClient(); + const timer = new Timer(); + hpc.on(HotPocket.events.contractOutput, (response) => { if (response.length < payload.length) console.log("Payload length mismatch."); - hpc.close().then(() => resolve()); + + const runPeriod = timer.stop(); + hpc.close().then(() => resolve(runPeriod)); }); + timer.start(); await hpc.sendContractInput(payload); }) } +function Timer() { + let startedOn = null; + + this.start = () => { + startedOn = new Date().getTime(); + } + + this.stop = () => { + const endedOn = new Date().getTime(); + return [startedOn, endedOn]; + } +} + main(); \ No newline at end of file