Streamer event upload to Azure function. (#343)

* Uploading event data to azure function endpoint to broadcast to end clients.
This commit is contained in:
Savinda Senevirathne
2021-08-12 13:59:36 +05:30
committed by GitHub
parent 05191a9f32
commit 9c7471a7db

View File

@@ -14,7 +14,7 @@ let keys = null;
let vultrApiKey = null;
let azureTable = null;
let tableSvc = null;
const queue = [];
const clusterQueues = {};
const metrics = {};
const nodeGroups = {};
@@ -58,7 +58,7 @@ async function main() {
// Start event dispatcher.
// This keeps sending events to the ingestion endpoint.
eventDispatcher();
eventDispatcher(config.azure_function.host, config.azure_function.path);
// Start event metrics tracker.
// metricsTracker();
@@ -75,18 +75,43 @@ async function resolveHosts(cluster) {
console.log(`${cluster.hosts.length} hosts in '${cluster.name}' cluster.`)
}
function eventDispatcher() {
function eventDispatcher(hostname, path) {
// Dispatch all queued events in batches.
const events = queue.splice(0);
while (true) {
obj = {};
for (const [cluster, data] of Object.entries(clusterQueues)) {
if (data.length > eventsBatchSize) {
obj[cluster] = data.splice(0, eventsBatchSize);
}
else if (data.length > 0) {
obj[cluster] = data.splice(0);
}
}
// Break the loop if there is no event data remaining.
if (Object.keys(obj).length == 0)
break;
for (let i = 0, j = events.length; i < j; i += eventsBatchSize) {
const batch = events.slice(i, i + eventsBatchSize);
const data = JSON.stringify(obj);
const req = https.request({
hostname: hostname,
port: 443,
path: path,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': data.length
}
});
// TODO: Upload batch of events.
req.on('error', error => {
console.error(error);
})
req.write(data);
req.end();
}
setTimeout(() => eventDispatcher(), dispatchInterval);
setTimeout(() => eventDispatcher(hostname, path), dispatchInterval);
}
function nodeStateUploader() {
@@ -205,7 +230,10 @@ async function reportEvent(node, ev) {
const ts = new Date().getTime(); // Epoch milliseconds.
queue.push({
if (!clusterQueues[node.cluster])
clusterQueues[node.cluster] = [];
clusterQueues[node.cluster].push({
cluster: node.cluster,
idx: node.idx,
uri: node.uri,
@@ -226,7 +254,7 @@ async function reportEvent(node, ev) {
else if (ev.event == 'offline') {
node.status = 'offline';
}
node.hasUpdates = true;
node.lastUpdated = ts;