mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
335 lines
10 KiB
JavaScript
335 lines
10 KiB
JavaScript
const HotPocket = require('hotpocket-js-client');
|
|
const azure = require('azure-storage');
|
|
const fs = require('fs').promises;
|
|
const https = require('https');
|
|
const fetch = require('node-fetch');
|
|
|
|
const dispatchInterval = process.env.DISPATCH || 1000;
|
|
const stateUploadInterval = process.env.STATEUPLOAD || 10000;
|
|
const metricsTrackInterval = process.env.METRICSTRACK || 10000;
|
|
const backoffDelayMax = process.env.BACKOFFMAX || 60000;
|
|
const eventsBatchSize = process.env.EVENTBATCH || 20;
|
|
const stateBatchSize = process.env.STATEBATCH || 20;
|
|
const synclog = process.env.SYNCLOG || "off";
|
|
const healthlog = process.env.HEALTHLOG || "off";
|
|
|
|
let keys = null;
|
|
let vultrApiKey = null;
|
|
let azureTable = null;
|
|
let tableSvc = null;
|
|
const clusterQueues = {};
|
|
const metrics = {};
|
|
const nodeGroups = {};
|
|
|
|
async function main() {
|
|
|
|
keys = await HotPocket.generateKeys();
|
|
|
|
const pkhex = Buffer.from(keys.publicKey).toString('hex');
|
|
console.log('My public key is: ' + pkhex);
|
|
|
|
// Load cluster config.
|
|
const config = JSON.parse(await fs.readFile("config.json"));
|
|
vultrApiKey = config.vultr.api_key;
|
|
|
|
// Create Azure table service.
|
|
if (!config.azure_table) {
|
|
console.log("Azure table config missing.");
|
|
return;
|
|
}
|
|
tableSvc = azure.createTableServiceWithSas(config.azure_table.host, config.azure_table.sas);
|
|
azureTable = config.azure_table.table;
|
|
|
|
// We only consider clusters with stream=true.
|
|
const clusters = config.contracts.filter(c => c.stream == true).map(c => ({
|
|
name: c.name,
|
|
hosts: c.hosts,
|
|
userPort: c.config.user.port
|
|
}));
|
|
console.log(`${clusters.length} clusters found with streaming enabled.`);
|
|
if (clusters.length == 0)
|
|
return;
|
|
|
|
// Resolve any vultr hosts.
|
|
await Promise.all(clusters.map(c => resolveHosts(c)));
|
|
if (clusters.filter(c => c.hosts.length > 0).length == 0)
|
|
return;
|
|
|
|
// Start node state uploader.
|
|
// This keeps uploading latest node state into table storage.
|
|
nodeStateUploader();
|
|
|
|
// Start event dispatcher.
|
|
// This keeps sending events to the ingestion endpoint.
|
|
eventDispatcher(config.azure_function.host, config.azure_function.path);
|
|
|
|
// Start event metrics tracker.
|
|
// metricsTracker();
|
|
|
|
// Start streaming events from all clusters.
|
|
clusters.forEach(c => streamCluster(c));
|
|
}
|
|
|
|
async function resolveHosts(cluster) {
|
|
|
|
// If a host has the pattern "vultr:", then we fetch hosts from vultr.
|
|
|
|
const resolvedHosts = [];
|
|
for (const host of cluster.hosts) {
|
|
if (host.startsWith("vultr:"))
|
|
resolvedHosts.push(...await getVultrHosts(host.split(":")[1]));
|
|
else
|
|
resolvedHosts.push(host);
|
|
}
|
|
|
|
cluster.hosts = resolvedHosts;
|
|
console.log(`${cluster.hosts.length} hosts in '${cluster.name}' cluster.`)
|
|
}
|
|
|
|
function eventDispatcher(hostname, path) {
|
|
// Dispatch all queued events in batches.
|
|
while (true) {
|
|
obj = {};
|
|
for (const [cluster, data] of Object.entries(clusterQueues)) {
|
|
if (data.length > eventsBatchSize) {
|
|
obj[cluster] = data.splice(0, eventsBatchSize);
|
|
}
|
|
else if (data.length > 0) {
|
|
obj[cluster] = data.splice(0);
|
|
}
|
|
}
|
|
// Break the loop if there is no event data remaining.
|
|
if (Object.keys(obj).length == 0)
|
|
break;
|
|
|
|
const data = JSON.stringify(obj);
|
|
const req = https.request({
|
|
hostname: hostname,
|
|
port: 443,
|
|
path: path,
|
|
method: 'POST',
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
'Content-Length': data.length
|
|
}
|
|
});
|
|
|
|
req.on('error', error => {
|
|
console.error(error);
|
|
})
|
|
|
|
req.write(data);
|
|
req.end();
|
|
}
|
|
|
|
setTimeout(() => eventDispatcher(hostname, path), dispatchInterval);
|
|
}
|
|
|
|
function nodeStateUploader() {
|
|
|
|
|
|
for (const [cluster, nodes] of Object.entries(nodeGroups)) {
|
|
|
|
// Collect nodes with updates inside this cluster.
|
|
const updated = [];
|
|
for (const node of nodes.filter(n => n.hasUpdates)) {
|
|
node.hasUpdates = false;
|
|
updated.push(node);
|
|
}
|
|
|
|
const ent = azure.TableUtilities.entityGenerator;
|
|
for (let i = 0, j = updated.length; i < j; i += stateBatchSize) {
|
|
const batch = updated.slice(i, i + stateBatchSize);
|
|
const tableBatch = new azure.TableBatch();
|
|
|
|
for (const node of batch) {
|
|
tableBatch.insertOrReplaceEntity({
|
|
PartitionKey: ent.String(node.cluster),
|
|
RowKey: ent.String(node.idx.toString()),
|
|
Uri: ent.String(node.uri),
|
|
LastUpdated: ent.DateTime(new Date(node.lastUpdated)),
|
|
Status: ent.String(node.status),
|
|
LastLedger: ent.String(JSON.stringify(node.lastLedger))
|
|
});
|
|
}
|
|
|
|
tableSvc.executeBatch(azureTable, tableBatch, (err) => err && console.log(err));
|
|
}
|
|
}
|
|
|
|
setTimeout(() => nodeStateUploader(), stateUploadInterval);
|
|
}
|
|
|
|
function metricsTracker() {
|
|
|
|
for (const [name, count] of Object.entries(metrics)) {
|
|
console.log(`${name}: ${count} events.`);
|
|
}
|
|
|
|
setTimeout(() => metricsTracker(), metricsTrackInterval);
|
|
}
|
|
|
|
async function streamCluster(cluster) {
|
|
console.log(`Starting to stream cluster '${cluster.name}'...`);
|
|
|
|
cluster.hosts.forEach((host, idx) => {
|
|
let port = cluster.userPort;
|
|
if (host.includes(":")) {
|
|
const parts = host.split(":");
|
|
host = parts[0];
|
|
port = parseInt(parts[1]);
|
|
}
|
|
streamNode(cluster.name, (idx + 1), host, port);
|
|
});
|
|
}
|
|
|
|
function streamNode(clusterName, nodeIdx, host, port) {
|
|
const uri = `wss://${host}:${port}`;
|
|
const node = {
|
|
cluster: clusterName,
|
|
idx: nodeIdx,
|
|
uri: uri,
|
|
failureCount: 0,
|
|
lastLedger: null,
|
|
status: null,
|
|
lastUpdated: null
|
|
};
|
|
|
|
if (!nodeGroups[clusterName])
|
|
nodeGroups[clusterName] = [];
|
|
|
|
nodeGroups[clusterName].push(node);
|
|
|
|
establishClientConnection(node);
|
|
}
|
|
|
|
async function establishClientConnection(node) {
|
|
|
|
const hpc = await HotPocket.createClient([node.uri], keys, { connectionTimeoutMs: 2000 });
|
|
|
|
hpc.on(HotPocket.events.disconnect, () => {
|
|
onConnectionFail(node);
|
|
});
|
|
|
|
// This will get fired when any ledger event occurs (ledger created, sync status change).
|
|
hpc.on(HotPocket.events.ledgerEvent, (ev) => {
|
|
reportEvent(node, ev);
|
|
});
|
|
|
|
// This will get fired when any diagnostic health event occurs.
|
|
if (healthlog === "on") {
|
|
hpc.on(HotPocket.events.healthEvent, async (ev) => {
|
|
|
|
const now = new Date().toUTCString();
|
|
if (ev.event === "proposal") {
|
|
delete ev.event;
|
|
const str = JSON.stringify(ev);
|
|
await fs.appendFile("prop_health.log", `${now}, Node${node.idx}, ${node.uri}, ${node.status}, ${str}\n`);
|
|
}
|
|
else if (ev.event === "connectivity") {
|
|
delete ev.event;
|
|
const str = JSON.stringify(ev);
|
|
await fs.appendFile("conn_health.log", `${now}, Node${node.idx}, ${node.uri}, ${node.status}, ${str}\n`);
|
|
}
|
|
});
|
|
|
|
await hpc.subscribe(HotPocket.notificationChannels.healthEvent);
|
|
}
|
|
|
|
// Establish HotPocket connection.
|
|
if (!await hpc.connect()) {
|
|
onConnectionFail(node);
|
|
}
|
|
else {
|
|
|
|
const stat = await hpc.getStatus();
|
|
const lastLedger = await hpc.getLedgerBySeqNo(stat.ledgerSeqNo);
|
|
|
|
node.failureCount = 0;
|
|
reportEvent(node, { event: "online", ledger: lastLedger, voteStatus: stat.voteStatus });
|
|
await hpc.subscribe(HotPocket.notificationChannels.ledgerEvent);
|
|
}
|
|
}
|
|
|
|
function onConnectionFail(node) {
|
|
|
|
node.failureCount++;
|
|
|
|
// Calculate back-off delay.
|
|
let delay = (2000 * node.failureCount);
|
|
if (delay > backoffDelayMax)
|
|
delay = backoffDelayMax;
|
|
|
|
console.log(`${node.uri} connection failed. Backoff ${delay}ms.`);
|
|
|
|
// Report offline event and connect again after a small delay.
|
|
reportEvent(node, { event: "offline" });
|
|
setTimeout(() => establishClientConnection(node), delay);
|
|
}
|
|
|
|
async function reportEvent(node, ev) {
|
|
|
|
const ts = new Date().getTime(); // Epoch milliseconds.
|
|
|
|
if (!clusterQueues[node.cluster])
|
|
clusterQueues[node.cluster] = [];
|
|
|
|
clusterQueues[node.cluster].push({
|
|
cluster: node.cluster,
|
|
idx: node.idx,
|
|
uri: node.uri,
|
|
timestamp: ts,
|
|
data: ev
|
|
});
|
|
|
|
if (ev.event == 'ledger_created') {
|
|
node.status = 'in_sync';
|
|
node.lastLedger = ev.ledger;
|
|
}
|
|
else if (ev.event == 'vote_status') {
|
|
// ev.voteStatus - possible values: 'unreliable', 'desync', 'synced'
|
|
node.status = ev.voteStatus == 'desync' ? 'desync' : 'in_sync';
|
|
|
|
if (synclog == "on")
|
|
await fs.appendFile("sync_ops.log", `${new Date(ts).toUTCString()}, Node${node.idx}, ${node.uri}, ${ev.voteStatus}, at ${node.lastLedger.seqNo}\n`);
|
|
}
|
|
else if (ev.event == 'online') {
|
|
node.status = ev.voteStatus == 'desync' ? 'desync' : 'in_sync';
|
|
node.lastLedger = ev.ledger;
|
|
}
|
|
else if (ev.event == 'offline') {
|
|
node.status = 'offline';
|
|
}
|
|
|
|
node.hasUpdates = true;
|
|
node.lastUpdated = ts;
|
|
|
|
const count = metrics[node.cluster];
|
|
metrics[node.cluster] = count ? (count + 1) : 1;
|
|
}
|
|
|
|
function getVultrHosts(group) {
|
|
|
|
return new Promise(async (resolve) => {
|
|
|
|
if (!group || group.trim().length == 0)
|
|
resolve([]);
|
|
|
|
const resp = await fetch(`https://api.vultr.com/v2/instances?tag=${group}`, {
|
|
method: 'GET',
|
|
headers: { "Authorization": `Bearer ${vultrApiKey}` }
|
|
});
|
|
|
|
const vms = (await resp.json()).instances;
|
|
if (!vms) {
|
|
console.log("Failed to get vultr instances.");
|
|
resolve([]);
|
|
return;
|
|
}
|
|
|
|
const ips = vms.sort((a, b) => (a.label < b.label) ? -1 : 1).map(i => i.main_ip);
|
|
resolve(ips);
|
|
})
|
|
}
|
|
|
|
main(); |