From 9c7471a7db7b35e9071c3b66c42900cf330fa1ce Mon Sep 17 00:00:00 2001 From: Savinda Senevirathne Date: Thu, 12 Aug 2021 13:59:36 +0530 Subject: [PATCH] Streamer event upload to Azure function. (#343) * Uploading event data to azure function endpoint to broadcast to end clients. --- test/vm-cluster/stream.js | 50 ++++++++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/test/vm-cluster/stream.js b/test/vm-cluster/stream.js index 58f6839b..46cb1fd1 100644 --- a/test/vm-cluster/stream.js +++ b/test/vm-cluster/stream.js @@ -14,7 +14,7 @@ let keys = null; let vultrApiKey = null; let azureTable = null; let tableSvc = null; -const queue = []; +const clusterQueues = {}; const metrics = {}; const nodeGroups = {}; @@ -58,7 +58,7 @@ async function main() { // Start event dispatcher. // This keeps sending events to the ingestion endpoint. - eventDispatcher(); + eventDispatcher(config.azure_function.host, config.azure_function.path); // Start event metrics tracker. // metricsTracker(); @@ -75,18 +75,43 @@ async function resolveHosts(cluster) { console.log(`${cluster.hosts.length} hosts in '${cluster.name}' cluster.`) } -function eventDispatcher() { - +function eventDispatcher(hostname, path) { // Dispatch all queued events in batches. - const events = queue.splice(0); + while (true) { + obj = {}; + for (const [cluster, data] of Object.entries(clusterQueues)) { + if (data.length > eventsBatchSize) { + obj[cluster] = data.splice(0, eventsBatchSize); + } + else if (data.length > 0) { + obj[cluster] = data.splice(0); + } + } + // Break the loop if there is no event data remaining. + if (Object.keys(obj).length == 0) + break; - for (let i = 0, j = events.length; i < j; i += eventsBatchSize) { - const batch = events.slice(i, i + eventsBatchSize); + const data = JSON.stringify(obj); + const req = https.request({ + hostname: hostname, + port: 443, + path: path, + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Content-Length': data.length + } + }); - // TODO: Upload batch of events. + req.on('error', error => { + console.error(error); + }) + + req.write(data); + req.end(); } - setTimeout(() => eventDispatcher(), dispatchInterval); + setTimeout(() => eventDispatcher(hostname, path), dispatchInterval); } function nodeStateUploader() { @@ -205,7 +230,10 @@ async function reportEvent(node, ev) { const ts = new Date().getTime(); // Epoch milliseconds. - queue.push({ + if (!clusterQueues[node.cluster]) + clusterQueues[node.cluster] = []; + + clusterQueues[node.cluster].push({ cluster: node.cluster, idx: node.idx, uri: node.uri, @@ -226,7 +254,7 @@ async function reportEvent(node, ev) { else if (ev.event == 'offline') { node.status = 'offline'; } - + node.hasUpdates = true; node.lastUpdated = ts;