diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index 8b0cb524..f479e6e2 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -88,10 +88,15 @@ namespace comm std::vector msg(data.size()); memcpy(msg.data(), data.data(), data.size()); + bool enqueued = false; + if (priority == 1) - in_msg_queue1.try_enqueue(std::move(msg)); + enqueued = in_msg_queue1.try_enqueue(std::move(msg)); else if (priority == 2) - in_msg_queue2.try_enqueue(std::move(msg)); + enqueued = in_msg_queue2.try_enqueue(std::move(msg)); + + if (!enqueued) + LOG_WARNING << "Failed to enqueue comm msg."; } // Signal the hpws client that we are ready for next message. diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 80b6876b..00fa208c 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -152,7 +152,8 @@ namespace usr std::string content; if (parser.extract_read_request(content) != -1) { - read_req::populate_read_req_queue(user.pubkey, std::move(content)); + if (read_req::populate_read_req_queue(user.pubkey, std::move(content)) == -1) + LOG_WARNING << "Failed to enqueue read request."; return 0; } else diff --git a/test/metrics/metrics.js b/test/metrics/metrics.js index 6a9b7fcd..b41bfb15 100644 --- a/test/metrics/metrics.js +++ b/test/metrics/metrics.js @@ -8,21 +8,26 @@ if (process.argv.length == 3) server = 'wss://localhost:' + process.argv[2]; if (process.argv.length == 4) server = 'wss://' + process.argv[2] + ':' + process.argv[3]; console.log("Server: " + server); +let activity = {}; + async function main() { HotPocket.setLogLevel(1); const tests = { "Large payload": () => largePayload(2), - "Single user read requests": () => singleUserReadRequests(10, 10), - "Single user Input/Output": () => singleUserInputOutput(10, 10), + "Single user read requests": () => multiUserReadRequests(10, 10, 1), + "Single user Input/Output": () => multiUserInputOutput(10, 10, 1), "Multi user read requests": () => multiUserReadRequests(10, 10, 10), "Multi user Input/Output": () => multiUserInputOutput(10, 10, 10), }; + activityLogger(); + // Execute all tests. for (const test in tests) { + console.log(); console.log(test + "..."); // The test will return single or multiple tuples of time periods. @@ -30,6 +35,9 @@ async function main() { // Each tuple indicates [start time, end time] of a particular atomic test. const result = await tests[test](); + // Test ended. Clear activity. + activity = {}; + // If the result is a single period tuple, put them in a parent array. const runPeriods = Array.isArray(result[0]) ? result : [result]; @@ -42,10 +50,20 @@ async function main() { console.log(duration + "ms"); } + activity = null; console.log("Done."); } +async function activityLogger() { + if (activity && Object.keys(activity).length > 0) + console.log(JSON.stringify(activity)); + + if (activity) + setTimeout(() => activityLogger(), 1000); +} + async function createClient() { + increment("creating"); const keys = await HotPocket.generateKeys(); const hpc = await HotPocket.createClient([server], keys, @@ -58,6 +76,8 @@ async function createClient() { if (!await hpc.connect()) { throw "Connection failed." } + decrement("creating"); + increment("clients"); return hpc; } @@ -73,6 +93,7 @@ function singleUserReadRequests(payloadKB, requestCount) { const timer = new Timer(); hpc.on(HotPocket.events.contractReadResponse, (response) => { + increment("readresp"); respCount++; if (respCount == requestCount) { const runPeriod = timer.stop(); @@ -82,13 +103,16 @@ function singleUserReadRequests(payloadKB, requestCount) { timer.start(); for (let i = 0; i < requestCount; i++) { - hpc.sendContractReadRequest(payload); + increment("submitting"); + hpc.sendContractReadRequest(payload).then(() => { + decrement("submitting"); + increment("submitted"); + }); } }) } function singleUserInputOutput(payloadKB, requestCount) { - return new Promise(async (resolve) => { const payload = "A".repeat(payloadKB * 1024); @@ -99,6 +123,7 @@ function singleUserInputOutput(payloadKB, requestCount) { hpc.on(HotPocket.events.contractOutput, (r) => { r.outputs.forEach(response => { + increment("outputs"); respCount++; if (respCount == requestCount) { const runPeriod = timer.stop(); @@ -109,17 +134,19 @@ function singleUserInputOutput(payloadKB, requestCount) { timer.start(); for (let i = 0; i < requestCount; i++) { + increment("submitting"); const input = await hpc.submitContractInput(payload); - input.submissionStatus.then(s => { - if (s.status != "accepted") - console.log(s.reason); - });; + decrement("submitting"); + increment("submitted"); + input.submissionStatus.then(onStatusResponse); } }) } function multiUserReadRequests(payloadKB, requestCountPerUser, userCount) { + console.log("Submitting " + (requestCountPerUser * userCount) + " requests."); + const tasks = []; for (let i = 0; i < userCount; i++) { tasks.push(singleUserReadRequests(payloadKB, requestCountPerUser)); @@ -129,6 +156,8 @@ function multiUserReadRequests(payloadKB, requestCountPerUser, userCount) { function multiUserInputOutput(payloadKB, requestCountPerUser, userCount) { + console.log("Submitting " + (requestCountPerUser * userCount) + " requests."); + const tasks = []; for (let i = 0; i < userCount; i++) { tasks.push(singleUserInputOutput(payloadKB, requestCountPerUser)); @@ -137,6 +166,7 @@ function multiUserInputOutput(payloadKB, requestCountPerUser, userCount) { } function largePayload(payloadMB) { + console.log("Submitting " + payloadMB + " MB request.") return new Promise(async (resolve) => { const payload = "A".repeat(payloadMB * 1024 * 1024); @@ -146,6 +176,7 @@ function largePayload(payloadMB) { hpc.on(HotPocket.events.contractOutput, (r) => { r.outputs.forEach(response => { + increment("outputs"); if (response.length < payload.length) console.log("Payload length mismatch."); @@ -155,14 +186,42 @@ function largePayload(payloadMB) { }); timer.start(); + increment("submitting"); const input = await hpc.submitContractInput(payload); - input.submissionStatus.then(s => { - if (s.status != "accepted") - console.log(s.reason); - });; + decrement("submitting"); + increment("submitted"); + input.submissionStatus.then(onStatusResponse); }) } +function increment(key) { + if (!activity[key]) + activity[key] = 0; + activity[key]++; +} + +function decrement(key) { + activity[key]--; + if (activity[key] == 0) + delete activity[key]; +} + +function onStatusResponse(s) { + + increment("statresp"); + + if (!activity.groups) + activity.groups = {}; + + if (!activity.groups[s.status]) + activity.groups[s.status] = 0; + + activity.groups[s.status]++; + + if (s.status != "accepted") + console.log(s.reason); +} + function Timer() { let startedOn = null;