diff --git a/examples/js_client/lib/hp-client-lib.js b/examples/js_client/lib/hp-client-lib.js index 49f282c6..fb2b7b0c 100644 --- a/examples/js_client/lib/hp-client-lib.js +++ b/examples/js_client/lib/hp-client-lib.js @@ -22,7 +22,7 @@ TextDecoder = util.TextDecoder; } - const supportedHpVersion = "0.5.0"; + const supportedHpVersion = "0.5."; const serverChallengeSize = 16; const outputValidationPassThreshold = 0.8; const connectionCheckIntervalMs = 1000; @@ -52,14 +52,16 @@ contractReadResponse: "contract_read_response", connectionChange: "connection_change", unlChange: "unl_change", - ledgerEvent: "ledger_event" + ledgerEvent: "ledger_event", + healthEvent: "health_event" } Object.freeze(events); /*--- Included in public interface. ---*/ const notificationChannels = { unlChange: "unl_change", - ledgerEvent: "ledger_event" + ledgerEvent: "ledger_event", + healthEvent: "health_event" } Object.freeze(notificationChannels); @@ -181,6 +183,7 @@ // Subscribe for unl changes if we have to maintain the trusted server key checks. subscriptions[notificationChannels.unlChange] = trustedKeysLookup ? true : false; subscriptions[notificationChannels.ledgerEvent] = false; + subscriptions[notificationChannels.healthEvent] = false; let status = 0; //0:none, 1:connected, 2:closed @@ -548,8 +551,8 @@ if (connectionStatus == 0 && m.type == "user_challenge" && m.hp_version && m.contract_id) { - if (m.hp_version != supportedHpVersion) { - liblog(1, `Incompatible Hot Pocket server version. Expected:${supportedHpVersion} Got:${m.hp_version}`); + if (!m.hp_version.startsWith(supportedHpVersion)) { + liblog(1, `Incompatible Hot Pocket server version. Expected:${supportedHpVersion}* Got:${m.hp_version}`); return false; } else if (!m.contract_id) { @@ -665,6 +668,7 @@ contractExecutionEnabled: m.contract_execution_enabled, readRequestsEnabled: m.read_requests_enabled, isFullHistoryNode: m.is_full_history_node, + weaklyConnected: m.weakly_connected, currentUnl: m.current_unl.map(u => msgHelper.deserializeValue(u)), peers: m.peers }); @@ -691,6 +695,10 @@ ev.inSync = m.in_sync; emitter.emit(events.ledgerEvent, ev); } + else if (m.type == "health_event") { + const ev = msgHelper.deserializeHealthEvent(m); + emitter.emit(events.healthEvent, ev); + } else if (m.type == "ledger_query_result") { const resolver = ledgerQueryResolvers[m.reply_for]; if (resolver) { @@ -1142,6 +1150,24 @@ outputHash: this.deserializeValue(l.output_hash) } } + + this.deserializeHealthEvent = (m) => { + if (m.event === "proposal") { + return { + event: m.event, + commLatency: m.comm_latency, + readLatency: m.read_latency, + batchSize: m.batch_size + } + } + else if (m.event === "connectivity") { + return { + event: m.event, + peerCount: m.peer_count, + weaklyConnected: m.weakly_connected + } + } + } } function hexToUint8Array(hexString) { diff --git a/examples/js_client/text-client.js b/examples/js_client/text-client.js index 09431412..60781180 100644 --- a/examples/js_client/text-client.js +++ b/examples/js_client/text-client.js @@ -54,12 +54,16 @@ async function main() { // This will get fired when contract sends outputs. hpc.on(HotPocket.events.contractOutput, (r) => { - r.outputs.forEach(o => console.log(`Output (ledger:${r.ledgerSeqNo})>> ${o}`)); + r.outputs.forEach(o => { + const outputLog = o.length <= 512 ? o : `[Big output (${o.length / 1024} KB)]`; + console.log(`Output (ledger:${r.ledgerSeqNo})>> ${outputLog}`); + }); }) // This will get fired when contract sends a read response. - hpc.on(HotPocket.events.contractReadResponse, (response) => { - console.log("Read response>> " + response); + hpc.on(HotPocket.events.contractReadResponse, (o) => { + const outputLog = o.length <= 512 ? o : `[Big output (${o.length / 1024} KB)]`; + console.log("Read response>> " + outputLog); }) // This will get fired when the unl public key list changes. @@ -73,6 +77,11 @@ async function main() { console.log(ev); }) + // This will get fired when any health event occurs (proposal stats, connectivity changes...). + hpc.on(HotPocket.events.healthEvent, (ev) => { + console.log(ev); + }) + // Establish HotPocket connection. if (!await hpc.connect()) { console.log('Connection failed.'); @@ -83,6 +92,7 @@ async function main() { // After connecting, we can subscribe to events from the HotPocket node. // await hpc.subscribe(HotPocket.notificationChannels.unlChange); // await hpc.subscribe(HotPocket.notificationChannels.ledgerEvent); + // await hpc.subscribe(HotPocket.notificationChannels.healthEvent); // start listening for stdin const rl = readline.createInterface({ @@ -110,7 +120,25 @@ async function main() { hpc.getLedgerBySeqNo(parseInt(inp.substr(7)), true, true) .then(result => console.log(result)); } + else if (inp.startsWith("health ")) { + if (inp.endsWith("on")) + hpc.subscribe(HotPocket.notificationChannels.healthEvent); + else if (inp.endsWith("off")) + hpc.unsubscribe(HotPocket.notificationChannels.healthEvent); + } + else if (inp === "stat") { + hpc.getStatus().then(stat => console.log(stat)); + } else { + + if (inp.startsWith("upload ")) { + const size = parseInt(inp.split(" ")[1]); + if (!isNaN(size)) { + inp = "A".repeat(size * 1024 * 1024); + console.log("Uploading " + size + " MB payload..."); + } + } + hpc.submitContractInput(inp).then(input => { // console.log(input.hash); input.submissionStatus.then(s => { diff --git a/examples/nodejs_contract/diagnostic_contract.js b/examples/nodejs_contract/diagnostic_contract.js new file mode 100644 index 00000000..21988d9f --- /dev/null +++ b/examples/nodejs_contract/diagnostic_contract.js @@ -0,0 +1,138 @@ +const HotPocket = require("./hp-contract-lib"); +const fs = require('fs').promises; +var seedrandom = require('seedrandom'); + +const filename = "file.dat"; +const autofilePrefix = "autofile"; +const autofileSize = 1 * 1024 * 1024; + +const diagnosticContract = async (ctx) => { + + // Collection of per-user promises to wait for. Each promise completes when inputs for that user is processed. + const userHandlers = []; + + for (const user of ctx.users.list()) { + + // For each user we add a promise to list of promises. + userHandlers.push(new Promise(async (resolve) => { + + // The contract need to ensure that all outputs for a particular user is emitted + // in deterministic order. Hence, we are processing all inputs for each user sequentially. + for (const input of user.inputs) { + + const buf = await ctx.users.read(input); + const parts = buf.toString().split(" "); + const mode = parts[0]; + const data = parts[1]; + let output = null; + + if (mode === "status") { + output = "Hot Pocket diagnostic contract is running."; + } + else if (mode === "file") { + const param = parseInt(data); + const stat = await fs.stat(filename).catch(() => { }); + + if (isNaN(param)) { + if (!stat) + output = "File does not exist."; + else + output = "Current size: " + stat.size / (1024 * 1024) + " MB"; + } + else { + if (param == 0) { + if (stat) { + await fs.unlink(filename); + output = "Deleted file."; + } + } + else { + if (!stat) + await fs.writeFile(filename, "Initial"); + + await fs.truncate(filename, param * 1024 * 1024); + output = "Updated file size to " + param + " MB"; + } + } + } + else if (mode === "files") { + const param = parseInt(data); + const autofiles = await (await fs.readdir(".")).filter(f => f.startsWith(autofilePrefix)); + + if (isNaN(param)) { + output = autofiles.length + " autofiles found."; + } + else { + if (param == 0) { + for (file of autofiles) { + await fs.unlink(file); + } + output = autofiles.length + " autofiles deleted."; + } + else { + const content = "A".repeat(autofileSize); + for (let i = (autofiles.length + 1); i <= (autofiles.length + param); i++) { + await fs.writeFile(autofilePrefix + i, content); + } + output = param + " new autofiles created. Total: " + (autofiles.length + param); + } + } + } + else if (mode === "download") { + const param = parseFloat(data); + if (!isNaN(param)) { + output = "A".repeat(param * 1024 * 1024); + } + } + else if (mode === "roundtime") { + const param = parseInt(data); + if (!isNaN(param)) { + if (param >= 100) { + const config = await ctx.getConfig(); + config.roundtime = param; + await ctx.updateConfig(config) + output = "Updated Roundtime to " + config.roundtime; + } + } + else { + const config = await ctx.getConfig(); + output = "Roundtime: " + config.roundtime; + } + } + else { + output = "Received unrecognized input of length " + buf.length; + } + + if (output) + await user.send(output); + } + + // The promise gets completed when all inputs for this user are processed. + resolve(); + })); + } + + // Wait until all user promises are complete. + await Promise.all(userHandlers); + + // Modify random file bytes (if file exists) + { + const stat = await fs.stat(filename).catch(() => { }); + if (stat) { + const rng = seedrandom(ctx.lcl_hash); + const fh = await fs.open(filename, 'r+'); + + for (let i = 0; i < 3; i++) { + const pos = rng() * (stat.size - 50); + const buf = ctx.lcl_hash.substr(i * 10, 10); + await fh.write(buf, pos); + } + + await fh.close(); + } + } + +} + +const hpc = new HotPocket.Contract(); +hpc.init(diagnosticContract); \ No newline at end of file diff --git a/examples/nodejs_contract/package-lock.json b/examples/nodejs_contract/package-lock.json index 9b14263f..5282962d 100644 --- a/examples/nodejs_contract/package-lock.json +++ b/examples/nodejs_contract/package-lock.json @@ -34,6 +34,11 @@ "version": "4.0.0", "resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz", "integrity": "sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA==" + }, + "seedrandom": { + "version": "3.0.5", + "resolved": "https://registry.npmjs.org/seedrandom/-/seedrandom-3.0.5.tgz", + "integrity": "sha512-8OwmbklUNzwezjGInmZ+2clQmExQPvomqjL7LFqOYqtmuxRgQYqOD3mHaU+MvZn5FLUeVxVfQjwLZW/n/JFuqg==" } } } diff --git a/examples/nodejs_contract/package.json b/examples/nodejs_contract/package.json index 22af9324..b456e9f4 100644 --- a/examples/nodejs_contract/package.json +++ b/examples/nodejs_contract/package.json @@ -1,9 +1,11 @@ { "scripts": { "build-echo": "ncc build echo_contract.js -o dist/echo-contract", - "build-file": "ncc build file_contract.js -o dist/file-contract" + "build-file": "ncc build file_contract.js -o dist/file-contract", + "build-diag": "ncc build diagnostic_contract.js -o dist/diagnostic-contract" }, "dependencies": { - "bson": "4.0.4" + "bson": "4.0.4", + "seedrandom": "3.0.5" } } diff --git a/src/conf.hpp b/src/conf.hpp index ec991d7c..28589f89 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -256,6 +256,12 @@ namespace conf size_t max_file_count = 0; // Max no. of log files to keep. }; + struct health_config + { + bool proposal_stats = false; + bool connectivity_stats = false; + }; + // Holds all the config values. struct hp_config { @@ -265,6 +271,7 @@ namespace conf user_config user; hpfs_config hpfs; log_config log; + health_config health; // For debugging only. Not included in the config file. }; // Global contract context struct exposed to the application. diff --git a/src/consensus.cpp b/src/consensus.cpp index b1b6e27c..9ec80e1f 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -92,6 +92,9 @@ namespace consensus break; } + if (ctx.stage == 0) + status::emit_proposal_health(); + if (consensus() == -1) { LOG_ERROR << "Consensus thread exited due to an error."; @@ -119,7 +122,7 @@ namespace consensus revise_candidate_proposals(ctx.vote_status == VOTES_SYNCED); // Attempt to close the ledger after scanning last round stage 3 proposals. - if (ctx.stage == 0) + if (ctx.stage == 0 && ctx.vote_status == VOTES_SYNCED) attempt_ledger_close(); // Get current lcl, state, patch, primary shard and raw shard info. @@ -156,7 +159,7 @@ namespace consensus const size_t unl_count = unl::count(); vote_counter votes; - // Check whether we are in sync with other nodes using proposals. + // Check whether we are in sync with other nodes using the proposals we received. { int new_sync_status = check_sync_status(unl_count, votes, lcl_id); @@ -174,7 +177,7 @@ namespace consensus } // Update the node's status if we went from in-sync to not-in-sync. We will report back as being in-sync only when ledger is created. - if (ctx.vote_status == VOTES_SYNCED && new_sync_status != VOTES_SYNCED) + if (new_sync_status == VOTES_DESYNC) status::sync_status_changed(false); // This marks entering into a new sync cycle. @@ -195,9 +198,10 @@ namespace consensus if (ctx.vote_status == VOTES_UNRELIABLE) { + ctx.stage = 0; ctx.unreliable_votes_attempts++; - // If we get too many consecative unreliable vote rounds, then we perform time config sniffing just in case the unreliable votes + // If we get too many consecutive unreliable vote rounds, then we perform time config sniffing just in case the unreliable votes // are caused because our roundtime config information is different from other nodes. if (ctx.unreliable_votes_attempts >= MAX_UNRELIABLE_VOTES_ATTEMPTS) { @@ -208,21 +212,21 @@ namespace consensus else { ctx.unreliable_votes_attempts = 0; - } - if (ctx.vote_status == VOTES_SYNCED) - { - // If we are in sync, vote and broadcast the winning votes to next stage. - const p2p::proposal p = create_stage123_proposal(votes, unl_count, state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id); - broadcast_proposal(p); - - // This marks the moment we finish a sync cycle. We are in stage 1 and we detect that our votes are in sync. - if (ctx.stage == 1 && ctx.sync_ongoing) + if (ctx.vote_status == VOTES_SYNCED) { - // Clear any sync recovery pending state if we enter stage 1 while being in sync. - ctx.sync_ongoing = false; - status::sync_status_changed(true); - LOG_DEBUG << "Sync recovery completed."; + // If we are in sync, vote and broadcast the winning votes to next stage. + const p2p::proposal p = create_stage123_proposal(votes, unl_count, state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id); + broadcast_proposal(p); + + // This marks the moment we finish a sync cycle. We are in stage 1 and we just detected that our votes are in sync. + if (ctx.stage == 1 && ctx.sync_ongoing) + { + // Clear any sync recovery pending state if we enter stage 1 while being in sync. + ctx.sync_ongoing = false; + status::sync_status_changed(true); + LOG_DEBUG << "Sync recovery completed."; + } } } @@ -447,7 +451,7 @@ namespace consensus /** * Moves proposals collected from the network into candidate proposals and * cleans up any outdated proposals from the candidate set. - * @param in_sync Whether the node is currently on sync or not. We relax the pruning criteria if we are not in sync. + * @param in_sync Whether the node is currently in sync or not. We relax the pruning criteria if we are not in sync. */ void revise_candidate_proposals(const bool in_sync) { @@ -459,6 +463,8 @@ namespace consensus collected_proposals.splice(collected_proposals.end(), p2p::ctx.collected_msgs.proposals); } + status::report_proposal_batch(collected_proposals); + // Prune incoming proposals if they are older than existing proposal from same node. { auto itr = collected_proposals.begin(); diff --git a/src/hplog.cpp b/src/hplog.cpp index 998f6351..340f8b09 100644 --- a/src/hplog.cpp +++ b/src/hplog.cpp @@ -43,15 +43,13 @@ namespace hplog plog::util::localtime_s(&t, &record.getTime().time); // local time plog::util::nostringstream ss; - ss << t.tm_year + 1900 << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_mon + 1 << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_mday << PLOG_NSTR(" "); - ss << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_hour << PLOG_NSTR(":") - << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_min << PLOG_NSTR(":") - << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_sec << PLOG_NSTR(" "); - // Uncomment for millseconds. - // << std::setfill(PLOG_NSTR('0')) << std::setw(3) << record.getTime().millitm << PLOG_NSTR(" "); - - ss << PLOG_NSTR("[") << severity_to_string(record.getSeverity()) << PLOG_NSTR("][hpc] "); - ss << record.getMessage() << PLOG_NSTR("\n"); + ss << t.tm_year + 1900 << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_mon + 1 << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_mday + << PLOG_NSTR(" ") << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_hour + << PLOG_NSTR(":") << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_min + << PLOG_NSTR(":") << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_sec + << PLOG_NSTR(".") << std::setfill(PLOG_NSTR('0')) << std::setw(3) << record.getTime().millitm // Uncomment for millseconds. + << PLOG_NSTR(" ") << PLOG_NSTR("[") << severity_to_string(record.getSeverity()) + << PLOG_NSTR("][hpc] ") << record.getMessage() << PLOG_NSTR("\n"); return ss.str(); } diff --git a/src/msg/bson/usrmsg_bson.cpp b/src/msg/bson/usrmsg_bson.cpp index 41c23116..aad7392f 100644 --- a/src/msg/bson/usrmsg_bson.cpp +++ b/src/msg/bson/usrmsg_bson.cpp @@ -24,6 +24,7 @@ namespace msg::usrmsg::bson * "contract_execution_enabled": true | false, * "read_requests_enabled": true | false, * "is_full_history_node": true | false, + * "weakly_connected": true | false, * "current_unl": [ , ... ], * "peers": [ "ip:port", ... ] * } @@ -33,6 +34,7 @@ namespace msg::usrmsg::bson const util::sequence_hash lcl_id = status::get_lcl_id(); const std::set unl = status::get_unl(); const bool in_sync = status::is_in_sync(); + const bool weakly_connected = status::get_weakly_connected(); jsoncons::bson::bson_bytes_encoder encoder(msg); encoder.begin_object(); @@ -54,6 +56,8 @@ namespace msg::usrmsg::bson encoder.bool_value(conf::cfg.user.concurrent_read_requests != 0); encoder.key(msg::usrmsg::FLD_IS_FULL_HISTORY_NODE); encoder.bool_value(conf::cfg.node.history == conf::HISTORY::FULL); + encoder.key(msg::usrmsg::FLD_WEAKLY_CONNECTED); + encoder.bool_value(weakly_connected); encoder.key(msg::usrmsg::FLD_CURRENT_UNL); encoder.begin_array(); @@ -315,6 +319,70 @@ namespace msg::usrmsg::bson encoder.flush(); } + /** + * Constructs health stat message. + * @param msg Buffer to construct the generated bson message into. + * Message format: + * { + * "type": "health_event", + * "proposals": { + * "comm_latency": {min:0, max:0, avg:0}, + * "read_latency": {min:0, max:0, avg:0} + * "batch_size": 0 + * }, + * "peer_count": 0, + * "weakly_connected": true | false + * } + * @param ev Current health information. + */ + void create_health_notification(std::vector &msg, const status::health_event &ev) + { + jsoncons::bson::bson_bytes_encoder encoder(msg); + encoder.begin_object(); + encoder.key(msg::usrmsg::FLD_TYPE); + encoder.string_value(msg::usrmsg::MSGTYPE_HEALTH_EVENT); + encoder.key(msg::usrmsg::FLD_EVENT); + + if (ev.index() == 0) + { + const status::proposal_health &phealth = std::get(ev); + + encoder.string_value(msg::usrmsg::HEALTH_EVENT_PROPOSAL); + encoder.key(msg::usrmsg::FLD_COMM_LATENCY); + encoder.begin_object(); + encoder.key(msg::usrmsg::FLD_MIN); + encoder.uint64_value(phealth.comm_latency_min); + encoder.key(msg::usrmsg::FLD_MAX); + encoder.uint64_value(phealth.comm_latency_max); + encoder.key(msg::usrmsg::FLD_AVG); + encoder.uint64_value(phealth.comm_latency_avg); + encoder.end_object(); + encoder.key(msg::usrmsg::FLD_READ_LATENCY); + encoder.begin_object(); + encoder.key(msg::usrmsg::FLD_MIN); + encoder.uint64_value(phealth.read_latency_min); + encoder.key(msg::usrmsg::FLD_MAX); + encoder.uint64_value(phealth.read_latency_max); + encoder.key(msg::usrmsg::FLD_AVG); + encoder.uint64_value(phealth.read_latency_avg); + encoder.end_object(); + encoder.key(msg::usrmsg::FLD_BATCH_SIZE); + encoder.uint64_value(phealth.batch_size); + } + else if (ev.index() == 1) + { + const status::connectivity_health &conn = std::get(ev); + encoder.string_value(msg::usrmsg::HEALTH_EVENT_CONNECTIVITY); + encoder.key(msg::usrmsg::FLD_PEER_COUNT); + encoder.uint64_value(conn.peer_count); + encoder.key(msg::usrmsg::FLD_WEAKLY_CONNECTED); + encoder.bool_value(conn.is_weakly_connected); + } + + encoder.end_object(); + encoder.flush(); + } + /** * Constructs a ledger query response. * @param msg Buffer to construct the generated bson message string into. @@ -533,6 +601,11 @@ namespace msg::usrmsg::bson { channel = usr::NOTIFICATION_CHANNEL::UNL_CHANGE; } + else if (d[msg::usrmsg::FLD_CHANNEL] == msg::usrmsg::MSGTYPE_HEALTH_EVENT && + (conf::cfg.health.proposal_stats || conf::cfg.health.connectivity_stats)) + { + channel = usr::NOTIFICATION_CHANNEL::HEALTH_STAT; + } else { LOG_DEBUG << "User subscription request invalid channel."; diff --git a/src/msg/bson/usrmsg_bson.hpp b/src/msg/bson/usrmsg_bson.hpp index 64f74bef..8791bec6 100644 --- a/src/msg/bson/usrmsg_bson.hpp +++ b/src/msg/bson/usrmsg_bson.hpp @@ -5,6 +5,7 @@ #include "../../util/merkle_hash_tree.hpp" #include "../../ledger/ledger_query.hpp" #include "../../usr/user_common.hpp" +#include "../../status.hpp" namespace msg::usrmsg::bson { @@ -28,6 +29,8 @@ namespace msg::usrmsg::bson void create_sync_status_notification(std::vector &msg, const bool in_sync); + void create_health_notification(std::vector &msg, const status::health_event &ev); + void create_ledger_query_response(std::vector &msg, std::string_view reply_for, const ledger::query::query_result &result); diff --git a/src/msg/fbuf/p2pmsg_conversion.cpp b/src/msg/fbuf/p2pmsg_conversion.cpp index 730208a6..f07b19b2 100644 --- a/src/msg/fbuf/p2pmsg_conversion.cpp +++ b/src/msg/fbuf/p2pmsg_conversion.cpp @@ -44,7 +44,7 @@ namespace msg::fbuf::p2pmsg if (session && session->challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED && message.size() <= MAX_SIZE_FOR_TIME_CHECK) { const uint64_t time_now = util::get_epoch_milliseconds(); - if (p2p_msg->created_on() < (time_now - (conf::cfg.contract.roundtime * 4))) + if (p2p_msg->created_on() < (time_now - (conf::cfg.contract.roundtime * 3))) { LOG_DEBUG << "Peer message is too old. type:" << p2p_msg->content_type() << " from:" << (session ? session->display_name() : ""); return p2p::peer_message_info{NULL, P2PMsgContent_NONE, 0}; diff --git a/src/msg/json/usrmsg_json.cpp b/src/msg/json/usrmsg_json.cpp index 6f3339cf..f5c9494c 100644 --- a/src/msg/json/usrmsg_json.cpp +++ b/src/msg/json/usrmsg_json.cpp @@ -143,6 +143,7 @@ namespace msg::usrmsg::json * "contract_execution_enabled": true | false, * "read_requests_enabled": true | false, * "is_full_history_node": true | false, + * "weakly_connected": true | false, * "current_unl": [ """, ... ], * "peers": [ "ip:port", ... ] * } @@ -152,6 +153,7 @@ namespace msg::usrmsg::json const util::sequence_hash lcl_id = status::get_lcl_id(); const std::set unl = status::get_unl(); const bool in_sync = status::is_in_sync(); + const bool weakly_connected = status::get_weakly_connected(); msg.reserve(1024); msg += "{\""; @@ -191,6 +193,11 @@ namespace msg::usrmsg::json msg += SEP_COLON_NOQUOTE; msg += conf::cfg.node.history == conf::HISTORY::FULL ? STR_TRUE : STR_FALSE; msg += SEP_COMMA_NOQUOTE; + msg += msg::usrmsg::FLD_WEAKLY_CONNECTED; + msg += SEP_COLON_NOQUOTE; + msg += weakly_connected ? STR_TRUE : STR_FALSE; + msg += SEP_COMMA_NOQUOTE; + msg += msg::usrmsg::FLD_CURRENT_UNL; msg += SEP_COLON_NOQUOTE; msg += OPEN_SQR_BRACKET; @@ -546,6 +553,97 @@ namespace msg::usrmsg::json msg += "}"; } + /** + * Constructs health stat message. + * @param msg Buffer to construct the generated json message string into. + * Message format: + * { + * "type": "health_event", + * "event": "proposal" | "connectivity", + * + * // proposal + * "comm_latency": {min:0, max:0, avg:0}, + * "read_latency": {min:0, max:0, avg:0} + * "batch_size": 0 + * + * // connectivity + * "peer_count": 0, + * "weakly_connected": true | false + * } + * @param ev Current health information. + */ + void create_health_notification(std::vector &msg, const status::health_event &ev) + { + msg.reserve(128); + msg += "{\""; + msg += msg::usrmsg::FLD_TYPE; + msg += SEP_COLON; + msg += msg::usrmsg::MSGTYPE_HEALTH_EVENT; + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_EVENT; + msg += SEP_COLON; + + if (ev.index() == 0) + { + const status::proposal_health &phealth = std::get(ev); + + msg += msg::usrmsg::HEALTH_EVENT_PROPOSAL; + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_COMM_LATENCY; + msg += SEP_COLON_NOQUOTE; + msg += "{\""; + msg += msg::usrmsg::FLD_MIN; + msg += SEP_COLON_NOQUOTE; + msg += std::to_string(phealth.comm_latency_min); + msg += SEP_COMMA_NOQUOTE; + msg += msg::usrmsg::FLD_MAX; + msg += SEP_COLON_NOQUOTE; + msg += std::to_string(phealth.comm_latency_max); + msg += SEP_COMMA_NOQUOTE; + msg += msg::usrmsg::FLD_AVG; + msg += SEP_COLON_NOQUOTE; + msg += std::to_string(phealth.comm_latency_avg); + msg += "}"; + + msg += SEP_COMMA_NOQUOTE; + msg += msg::usrmsg::FLD_READ_LATENCY; + msg += SEP_COLON_NOQUOTE; + msg += "{\""; + msg += msg::usrmsg::FLD_MIN; + msg += SEP_COLON_NOQUOTE; + msg += std::to_string(phealth.read_latency_min); + msg += SEP_COMMA_NOQUOTE; + msg += msg::usrmsg::FLD_MAX; + msg += SEP_COLON_NOQUOTE; + msg += std::to_string(phealth.read_latency_max); + msg += SEP_COMMA_NOQUOTE; + msg += msg::usrmsg::FLD_AVG; + msg += SEP_COLON_NOQUOTE; + msg += std::to_string(phealth.read_latency_avg); + msg += "}"; + + msg += SEP_COMMA_NOQUOTE; + msg += msg::usrmsg::FLD_BATCH_SIZE; + msg += SEP_COLON_NOQUOTE; + msg += std::to_string(phealth.batch_size); + } + else if (ev.index() == 1) + { + const status::connectivity_health &conn = std::get(ev); + msg += msg::usrmsg::HEALTH_EVENT_CONNECTIVITY; + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_PEER_COUNT; + msg += SEP_COLON_NOQUOTE; + msg += std::to_string(conn.peer_count); + msg += SEP_COMMA_NOQUOTE; + msg += msg::usrmsg::FLD_WEAKLY_CONNECTED; + msg += SEP_COLON_NOQUOTE; + msg += conn.is_weakly_connected ? STR_TRUE : STR_FALSE; + } + + msg += "}"; + } + /** * Constructs a ledger query response. * @param msg Buffer to construct the generated json message string into. @@ -892,6 +990,11 @@ namespace msg::usrmsg::json { channel = usr::NOTIFICATION_CHANNEL::UNL_CHANGE; } + else if (d[msg::usrmsg::FLD_CHANNEL] == msg::usrmsg::MSGTYPE_HEALTH_EVENT && + (conf::cfg.health.proposal_stats || conf::cfg.health.connectivity_stats)) + { + channel = usr::NOTIFICATION_CHANNEL::HEALTH_STAT; + } else { LOG_DEBUG << "User subscription request invalid channel."; diff --git a/src/msg/json/usrmsg_json.hpp b/src/msg/json/usrmsg_json.hpp index d06381e3..d21189d3 100644 --- a/src/msg/json/usrmsg_json.hpp +++ b/src/msg/json/usrmsg_json.hpp @@ -5,6 +5,7 @@ #include "../../util/merkle_hash_tree.hpp" #include "../../ledger/ledger_query.hpp" #include "../../usr/user_common.hpp" +#include "../../status.hpp" namespace msg::usrmsg::json { @@ -32,6 +33,8 @@ namespace msg::usrmsg::json void create_sync_status_notification(std::vector &msg, const bool in_sync); + void create_health_notification(std::vector &msg, const status::health_event &ev); + void create_ledger_query_response(std::vector &msg, std::string_view reply_for, const ledger::query::query_result &result); diff --git a/src/msg/usrmsg_common.hpp b/src/msg/usrmsg_common.hpp index 0be8d771..26f66265 100644 --- a/src/msg/usrmsg_common.hpp +++ b/src/msg/usrmsg_common.hpp @@ -9,7 +9,7 @@ namespace msg::usrmsg constexpr size_t CHALLENGE_LEN = 16; // Max no. of known peers to return in get status. - constexpr const size_t MAX_KNOWN_PEERS_INFO = 16; + constexpr const size_t MAX_KNOWN_PEERS_INFO = 100; // Message field names constexpr const char *FLD_HP_VERSION = "hp_version"; @@ -65,6 +65,14 @@ namespace msg::usrmsg constexpr const char *FLD_LEDGER = "ledger"; constexpr const char *FLD_CHANNEL = "channel"; constexpr const char *FLD_ENABLED = "enabled"; + constexpr const char *FLD_COMM_LATENCY = "comm_latency"; + constexpr const char *FLD_READ_LATENCY = "read_latency"; + constexpr const char *FLD_BATCH_SIZE = "batch_size"; + constexpr const char *FLD_MIN = "min"; + constexpr const char *FLD_MAX = "max"; + constexpr const char *FLD_AVG = "avg"; + constexpr const char *FLD_PEER_COUNT = "peer_count"; + constexpr const char *FLD_WEAKLY_CONNECTED = "weakly_connected"; // Message types constexpr const char *MSGTYPE_USER_CHALLENGE = "user_challenge"; @@ -81,6 +89,7 @@ namespace msg::usrmsg constexpr const char *MSGTYPE_LCL_RESPONSE = "lcl_response"; constexpr const char *MSGTYPE_UNL_CHANGE = "unl_change"; constexpr const char *MSGTYPE_LEDGER_EVENT = "ledger_event"; + constexpr const char *MSGTYPE_HEALTH_EVENT = "health_event"; constexpr const char *MSGTYPE_LEDGER_QUERY = "ledger_query"; constexpr const char *MSGTYPE_LEDGER_QUERY_RESULT = "ledger_query_result"; constexpr const char *MSGTYPE_SUBSCRIPTION = "subscription"; @@ -103,7 +112,8 @@ namespace msg::usrmsg constexpr const char *STR_FALSE = "false"; constexpr const char *LEDGER_EVENT_LEDGER_CREATED = "ledger_created"; constexpr const char *LEDGER_EVENT_SYNC_STATUS = "sync_status"; - + constexpr const char *HEALTH_EVENT_PROPOSAL = "proposal"; + constexpr const char *HEALTH_EVENT_CONNECTIVITY = "connectivity"; } // namespace msg::usrmsg diff --git a/src/msg/usrmsg_parser.cpp b/src/msg/usrmsg_parser.cpp index a42ae6a8..bd56d5ca 100644 --- a/src/msg/usrmsg_parser.cpp +++ b/src/msg/usrmsg_parser.cpp @@ -80,6 +80,14 @@ namespace msg::usrmsg busrmsg::create_sync_status_notification(msg, in_sync); } + void usrmsg_parser::create_health_notification(std::vector &msg, const status::health_event &ev) const + { + if (protocol == util::PROTOCOL::JSON) + jusrmsg::create_health_notification(msg, ev); + else + busrmsg::create_health_notification(msg, ev); + } + void usrmsg_parser::create_ledger_query_response(std::vector &msg, std::string_view reply_for, const ledger::query::query_result &result) const { diff --git a/src/msg/usrmsg_parser.hpp b/src/msg/usrmsg_parser.hpp index 0efa11a0..e3012df9 100644 --- a/src/msg/usrmsg_parser.hpp +++ b/src/msg/usrmsg_parser.hpp @@ -6,6 +6,7 @@ #include "../util/merkle_hash_tree.hpp" #include "../ledger/ledger_query.hpp" #include "../usr/user_common.hpp" +#include "../status.hpp" namespace msg::usrmsg { @@ -37,6 +38,8 @@ namespace msg::usrmsg void create_sync_status_notification(std::vector &msg, const bool in_sync) const; + void create_health_notification(std::vector &msg, const status::health_event &ev) const; + void create_ledger_query_response(std::vector &msg, std::string_view reply_for, const ledger::query::query_result &result) const; diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index c40e98f7..a0da4abf 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -14,7 +14,7 @@ namespace p2pmsg = msg::fbuf::p2pmsg; // Maximum no. of peers that will be persisted back to config upon exit. -constexpr size_t MAX_PERSISTED_KNOWN_PEERS = 50; +constexpr size_t MAX_PERSISTED_KNOWN_PEERS = 100; namespace p2p { @@ -409,8 +409,16 @@ namespace p2p */ void send_known_peer_list(peer_comm_session *session) { + const std::vector &peers = ctx.server->req_known_remotes; + + // Add self to known peer announcement (indicated as blank host address). + // peers.push_back(peer_properties{ + // conf::peer_ip_port{"", conf::cfg.mesh.port}, + // status::get_available_mesh_capacity(), + // util::get_epoch_milliseconds()}); + flatbuffers::FlatBufferBuilder fbuf; - p2pmsg::create_msg_from_peer_list_response(fbuf, ctx.server->req_known_remotes, session->known_ipport); + p2pmsg::create_msg_from_peer_list_response(fbuf, peers, session->known_ipport); session->send(msg::fbuf::builder_to_string_view(fbuf)); } @@ -432,7 +440,7 @@ namespace p2p itr->available_capacity = available_capacity; itr->timestamp = timestamp; - // Sorting the known remote list according to the weight value after updating the peer properties. + // Sorting the known remote list according to the weight value after updating the peer properties. sort_known_remotes(); } } @@ -452,13 +460,21 @@ namespace p2p /** * Merging the response peer list with the own known peer list. * @param peers Incoming peer list. + * @param from The session that sent us the peer list. */ - void merge_peer_list(const std::vector &peers) + void merge_peer_list(const std::vector &peers, const p2p::peer_comm_session &from) { std::scoped_lock lock(ctx.server->req_known_remotes_mutex); for (const peer_properties &peer : peers) { + // If the peer address is indicated as empty, that is the entry for the peer who sent us this. + // We then fill that up with the host address we see for that peer. + // if (peer.ip_port.host_address.empty()) + // { + // peer.ip_port.host_address = from.host_address; + // } + // If the peer is self, we won't add to the known peer list. if (self::ip_port.has_value() && self::ip_port == peer.ip_port) { @@ -519,7 +535,7 @@ namespace p2p * Calculate and retunrns the available capacity. * @returns -1 if available capacity is unlimited otherwise available value. */ - int16_t get_available_capacity() + int16_t calculate_available_capacity() { // If both max_connections and max_known_connections are configured calculate the capacity. if (conf::cfg.mesh.max_connections != 0 && conf::cfg.mesh.max_known_connections != 0) diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 0708fb2c..7549218e 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -237,11 +237,11 @@ namespace p2p void update_known_peer_available_capacity(const conf::peer_ip_port &ip_port, const int16_t available_capacity, const uint64_t ×tamp); - void merge_peer_list(const std::vector &peers); + void merge_peer_list(const std::vector &peers, const p2p::peer_comm_session &from); void sort_known_remotes(); - int16_t get_available_capacity(); + int16_t calculate_available_capacity(); void update_unl_connections(); diff --git a/src/p2p/peer_comm_server.cpp b/src/p2p/peer_comm_server.cpp index 15d6bbd7..69257473 100644 --- a/src/p2p/peer_comm_server.cpp +++ b/src/p2p/peer_comm_server.cpp @@ -11,8 +11,6 @@ namespace p2p { constexpr float WEAKLY_CONNECTED_THRESHOLD = 0.7; - // Globally exposed weakly connected status variable. - bool is_weakly_connected = false; peer_comm_server::peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size, const uint64_t max_in_connections, const uint64_t max_in_connections_per_host, @@ -45,24 +43,31 @@ namespace p2p uint16_t peer_managing_counter = 0; uint16_t known_connections_counter = 0; + uint16_t available_capacity_counter = 0; while (!is_shutting_down) { peer_managing_counter++; known_connections_counter++; + available_capacity_counter++; - if (known_connections_counter % 20 == 0) + if (known_connections_counter % 40 == 0) { maintain_known_connections(); known_connections_counter = 0; } - // Send available peer capacity if peer max connections is configured. - if (conf::cfg.mesh.max_connections != 0) - p2p::send_available_capacity_announcement(p2p::get_available_capacity()); + if (available_capacity_counter % 300 == 0) + { + status::set_available_mesh_capacity(p2p::calculate_available_capacity()); + + // Send available peer capacity if peer max connections is configured. + if (conf::cfg.mesh.max_connections != 0) + p2p::send_available_capacity_announcement(status::get_available_mesh_capacity()); + } // Start peer list request loop if dynamic peer discovery is enabled. - if (conf::cfg.mesh.peer_discovery.enabled && known_remote_count > 0) + if (conf::cfg.mesh.peer_discovery.enabled) { // If max known peer connection cap is reached then periodically request peer list from random known peer. // Otherwise frequently request peer list from a random known peer. @@ -193,23 +198,32 @@ namespace p2p } } /** - * Check whether the node is weakly connected or strongly connected in every 60 seconds. + * Check whether the node is weakly connected or strongly connected. */ void peer_comm_server::detect_if_weakly_connected() { - if (connected_status_check_counter == 600) + // If the node is already weakly connected, check every 2 seconds whether we are now strongly connected. + // Otherwise check every 60 seconds. This makes it harder to become weakly connected and easier to get out of it. + // This can help with unnessary flooding of forwarded messages across the network. + bool weakly_connected = status::get_weakly_connected(); + if (connected_status_check_counter == (weakly_connected ? 20 : 600)) { // Get the count of peers which are unl nodes. - // One is added to session list size only if we are a unl node, to reflect the self connection. + // One is added to peer count only if we are a unl node, to reflect the self connection. const int connected_peer_count = std::count_if(sessions.begin(), sessions.end(), [](const p2p::peer_comm_session &session) { return session.is_unl; }) + (conf::cfg.node.is_unl ? 1 : 0); const bool current_state = connected_peer_count < (unl::count() * WEAKLY_CONNECTED_THRESHOLD); - if (is_weakly_connected != current_state) + if (weakly_connected != current_state) { - is_weakly_connected = !is_weakly_connected; - send_peer_requirement_announcement(is_weakly_connected); - LOG_DEBUG << "Sent weakly connected announcement."; + weakly_connected = !weakly_connected; + send_peer_requirement_announcement(weakly_connected); + status::set_weakly_connected(weakly_connected); + + if (weakly_connected) + LOG_WARNING << "Became weakly connected."; + else + LOG_INFO << "No longer weakly connected."; } connected_status_check_counter = 0; } diff --git a/src/p2p/peer_comm_server.hpp b/src/p2p/peer_comm_server.hpp index dea9112d..33dce2ec 100644 --- a/src/p2p/peer_comm_server.hpp +++ b/src/p2p/peer_comm_server.hpp @@ -6,9 +6,6 @@ namespace p2p { - // Globally exposed weakly connected status variable. - extern bool is_weakly_connected; - struct peer_properties; class peer_comm_server : public comm::comm_server diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 242c0a69..0b9a38d8 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -13,6 +13,7 @@ #include "p2p.hpp" #include "../unl.hpp" #include "../sc/hpfs_log_sync.hpp" +#include "../status.hpp" namespace p2pmsg = msg::fbuf::p2pmsg; @@ -32,7 +33,7 @@ namespace p2p int handle_peer_connect(p2p::peer_comm_session &session) { // Skip new inbound connection if max inbound connection cap is reached. - if (session.is_inbound && get_available_capacity() == 0) + if (session.is_inbound && calculate_available_capacity() == 0) { LOG_DEBUG << "Max peer connection cap reached. Rejecting new peer connection [" << session.display_name() << "]"; return -1; @@ -148,7 +149,7 @@ namespace p2p if (mi.type == p2pmsg::P2PMsgContent_PeerListResponseMsg) { - p2p::merge_peer_list(p2pmsg::create_peer_list_response_from_msg(mi)); + p2p::merge_peer_list(p2pmsg::create_peer_list_response_from_msg(mi), session); } else if (mi.type == p2pmsg::P2PMsgContent_PeerListRequestMsg) { @@ -334,9 +335,7 @@ namespace p2p void handle_peer_on_verified(p2p::peer_comm_session &session) { // Sending newly verified node the requirement of consensus msg fowarding if this node is weakly connected. - if (p2p::is_weakly_connected) - { - p2p::send_peer_requirement_announcement(is_weakly_connected, &session); - } + if (status::get_weakly_connected()) + p2p::send_peer_requirement_announcement(true, &session); } } // namespace p2p \ No newline at end of file diff --git a/src/status.cpp b/src/status.cpp index 6106858e..d45ddfd5 100644 --- a/src/status.cpp +++ b/src/status.cpp @@ -2,6 +2,7 @@ #include "util/sequence_hash.hpp" #include "ledger/ledger_common.hpp" #include "conf.hpp" +#include "p2p/p2p.hpp" namespace status { @@ -20,6 +21,11 @@ namespace status std::shared_mutex peers_mutex; std::set peers; // Known ip:port pairs for connection verified peers. + std::atomic peer_count = 0; + std::atomic weakly_connected = false; + std::atomic available_mesh_capacity = -1; + + proposal_health phealth = {}; //----- Ledger status @@ -33,7 +39,7 @@ namespace status void ledger_created(const util::sequence_hash &ledger_id, const ledger::ledger_record &ledger) { // If currently not-in-sync, report it as in-sync when a ledger is created. - if (in_sync != 1) + if (in_sync.load() != 1) sync_status_changed(true); std::unique_lock lock(ledger_mutex); @@ -44,8 +50,12 @@ namespace status void sync_status_changed(const bool new_in_sync) { - in_sync = new_in_sync ? 1 : 0; - event_queue.try_enqueue(sync_status_change_event{new_in_sync}); + const int new_value = new_in_sync ? 1 : 0; + if (new_value != in_sync.load()) + { + in_sync = new_value; + event_queue.try_enqueue(sync_status_change_event{new_in_sync}); + } } const util::sequence_hash get_lcl_id() @@ -56,7 +66,7 @@ namespace status const bool is_in_sync() { - return in_sync == 1; + return in_sync.load() == 1; } const ledger::ledger_record get_last_ledger() @@ -93,6 +103,14 @@ namespace status { std::unique_lock lock(peers_mutex); peers = std::move(updated_peers); + + if (peers.size() != peer_count) + { + peer_count = peers.size(); + + if (conf::cfg.health.connectivity_stats) + event_queue.try_enqueue(connectivity_health{peer_count.load(), weakly_connected.load()}); + } } const std::set get_peers() @@ -101,4 +119,90 @@ namespace status return peers; } + const size_t get_peers_count() + { + return peer_count.load(); + } + + void set_weakly_connected(const bool is_weakly_connected) + { + if (weakly_connected.load() != is_weakly_connected) + { + weakly_connected = is_weakly_connected; + + if (conf::cfg.health.connectivity_stats) + event_queue.try_enqueue(connectivity_health{peer_count.load(), weakly_connected.load()}); + } + } + + const bool get_weakly_connected() + { + return weakly_connected.load(); + } + + void set_available_mesh_capacity(const int16_t new_capacity) + { + available_mesh_capacity = new_capacity; + } + + const int16_t get_available_mesh_capacity() + { + return available_mesh_capacity.load(); + } + + //----- Node health + + void report_proposal_batch(const std::list &proposals) + { + if (!conf::cfg.health.proposal_stats) + return; + + phealth.comm_latency_min = UINT64_MAX; + phealth.comm_latency_max = 0; + phealth.comm_latency_avg = 0; + phealth.read_latency_min = UINT64_MAX; + phealth.read_latency_max = 0; + phealth.read_latency_avg = 0; + phealth.batch_size = proposals.size(); + + if (phealth.batch_size == 0) + return; + + const uint64_t now = util::get_epoch_milliseconds(); + uint64_t total_comm_latency = 0; + uint64_t total_read_latency = 0; + + for (const p2p::proposal &p : proposals) + { + const uint64_t comm_latency = (p.sent_timestamp < p.recv_timestamp) ? (p.recv_timestamp - p.sent_timestamp) : 0; + const uint64_t read_latency = now - p.recv_timestamp; + + total_comm_latency += comm_latency; + total_read_latency += read_latency; + + if (comm_latency < phealth.comm_latency_min) + phealth.comm_latency_min = comm_latency; + + if (comm_latency > phealth.comm_latency_max) + phealth.comm_latency_max = comm_latency; + + if (read_latency < phealth.read_latency_min) + phealth.read_latency_min = read_latency; + + if (read_latency > phealth.read_latency_max) + phealth.read_latency_max = read_latency; + } + + phealth.comm_latency_avg = total_comm_latency / phealth.batch_size; + phealth.read_latency_avg = total_read_latency / phealth.batch_size; + } + + void emit_proposal_health() + { + if (!conf::cfg.health.proposal_stats) + return; + + event_queue.try_enqueue(phealth); + } + } // namespace status \ No newline at end of file diff --git a/src/status.hpp b/src/status.hpp index a84bc1e7..12304391 100644 --- a/src/status.hpp +++ b/src/status.hpp @@ -5,6 +5,7 @@ #include "util/sequence_hash.hpp" #include "ledger/ledger_common.hpp" #include "conf.hpp" +#include "p2p/p2p.hpp" namespace status { @@ -23,8 +24,27 @@ namespace status bool in_sync = false; }; + struct proposal_health + { + uint64_t comm_latency_min = 0; + uint64_t comm_latency_max = 0; + uint64_t comm_latency_avg = 0; + uint64_t read_latency_min = 0; + uint64_t read_latency_max = 0; + uint64_t read_latency_avg = 0; + uint64_t batch_size = 0; + }; + + struct connectivity_health + { + size_t peer_count = 0; + bool is_weakly_connected = false; + }; + + typedef std::variant health_event; + // Represents any kind of change that has happened in the node. - typedef std::variant change_event; + typedef std::variant change_event; extern moodycamel::ConcurrentQueue event_queue; @@ -41,6 +61,14 @@ namespace status void set_peers(const std::set &updated_peers); const std::set get_peers(); + const size_t get_peers_count(); + void set_weakly_connected(const bool is_weakly_connected); + const bool get_weakly_connected(); + void set_available_mesh_capacity(const int16_t new_capacity); + const int16_t get_available_mesh_capacity(); + + void report_proposal_batch(const std::list &proposals); + void emit_proposal_health(); } // namespace status diff --git a/src/usr/user_common.hpp b/src/usr/user_common.hpp index f1cad212..38943a9e 100644 --- a/src/usr/user_common.hpp +++ b/src/usr/user_common.hpp @@ -7,7 +7,8 @@ namespace usr enum NOTIFICATION_CHANNEL { UNL_CHANGE = 0, - LEDGER_EVENT = 1 + LEDGER_EVENT = 1, + HEALTH_STAT = 2 }; } // namespace usr diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 6a96c68e..83551cca 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -609,6 +609,24 @@ namespace usr } } } + else if (ev.index() == 3) // Health events. Broadcast for subscribed users. + { + std::scoped_lock lock(ctx.users_mutex); + for (auto &[sid, user] : ctx.users) + { + if (user.subscriptions[NOTIFICATION_CHANNEL::HEALTH_STAT]) + { + std::vector &msg = protocol_msgs[user.protocol]; + if (msg.empty()) // Construct the message with relevant protocol if not done so already. + { + msg::usrmsg::usrmsg_parser parser(user.protocol); + const status::health_event &health_ev = std::get(ev); + parser.create_health_notification(msg, health_ev); + } + user.session.send(msg); + } + } + } } } diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index 02fa9ceb..313adec0 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -36,7 +36,7 @@ namespace usr size_t collected_input_size = 0; // User's notification subscription toggles. - bool subscriptions[2]; + bool subscriptions[3]; // Holds the websocket session of this user. // We don't need to own the session object since the lifetime of user and session are coupled. @@ -55,6 +55,7 @@ namespace usr // Default subscriptions. subscriptions[NOTIFICATION_CHANNEL::UNL_CHANGE] = false; subscriptions[NOTIFICATION_CHANNEL::LEDGER_EVENT] = false; + subscriptions[NOTIFICATION_CHANNEL::HEALTH_STAT] = false; } }; diff --git a/src/util/version.hpp b/src/util/version.hpp index 02807fe5..d9d24d31 100644 --- a/src/util/version.hpp +++ b/src/util/version.hpp @@ -6,7 +6,7 @@ namespace version { // Hot Pocket version. Written to new configs and p2p/user messages. - constexpr const char *HP_VERSION = "0.5.0"; + constexpr const char *HP_VERSION = "0.5.1"; // Minimum compatible config version (this will be used to validate configs). constexpr const char *MIN_CONFIG_VERSION = "0.5.0"; diff --git a/test/local-cluster/cluster-create.sh b/test/local-cluster/cluster-create.sh index 834a185b..fedeca11 100755 --- a/test/local-cluster/cluster-create.sh +++ b/test/local-cluster/cluster-create.sh @@ -17,6 +17,7 @@ ncount=$1 loglevel=$2 roundtime=$3 hpcore=$(realpath ../..) +iprange="172.1.1" # Contract can be set with 'export CONTRACT='. Defaults to nodejs echo contract. if [ "$CONTRACT" = "cecho" ]; then # C echo contract @@ -37,6 +38,16 @@ elif [ "$CONTRACT" = "nodefile" ]; then # nodejs file contract (uses BSON protoc binary="/usr/bin/node" binargs="index.js" +elif [ "$CONTRACT" = "diag" ]; then # Diagnostic contract + echo "Using diagnostic contract." + pushd $hpcore/examples/nodejs_contract/ > /dev/null 2>&1 + npm install + npm run build-diag + popd > /dev/null 2>&1 + copyfiles="$hpcore/examples/nodejs_contract/dist/diagnostic-contract/index.js" + binary="/usr/bin/node" + binargs="index.js" + else # nodejs echo contract (default) echo "Using nodejs echo contract." pushd $hpcore/examples/nodejs_contract/ > /dev/null 2>&1 @@ -85,7 +96,7 @@ do # During hosting we use docker virtual dns instead of IP address. # So each node is reachable via 'node' name. - peers[i]="node${n}:${peerport}" + peers[i]="$iprange.${n}:${peerport}" # Update config. node_json=$(node -p "JSON.stringify({...require('./tmp.json').node, \ @@ -214,7 +225,8 @@ popd > /dev/null 2>&1 # Create docker virtual network named 'hpnet' # All nodes will communicate with each other via this network. -docker network create --driver bridge hpnet > /dev/null 2>&1 +docker network rm hpnet > /dev/null 2>&1 +docker network create --driver=bridge --subnet=$iprange.0/24 --gateway=$iprange.254 hpnet > /dev/null 2>&1 echo "Cluster generated at ${clusterloc}" echo "Use \"./cluster-start.sh \" to run each node." diff --git a/test/local-cluster/cluster-start.sh b/test/local-cluster/cluster-start.sh index d632428a..ad9b15f9 100755 --- a/test/local-cluster/cluster-start.sh +++ b/test/local-cluster/cluster-start.sh @@ -21,7 +21,7 @@ let peerport=22860+$n # Mount the node contract directory into hpcore docker container and run. # We specify --network=hpnet so all nodes will communicate via 'hpnet' docker virtual network. # We specify --name for each node so it will be the virtual dns name for each node. -docker run --rm -t -i --network=hpnet --name=node${n} \ +docker run --rm -t -i --network=hpnet --ip=172.1.1.${n} --name=node${n} \ -p ${pubport}:${pubport} \ -p ${peerport}:${peerport} \ --device /dev/fuse --cap-add SYS_ADMIN --security-opt apparmor:unconfined \ diff --git a/test/vm-cluster/.gitignore b/test/vm-cluster/.gitignore index 803f22dc..a4b57333 100644 --- a/test/vm-cluster/.gitignore +++ b/test/vm-cluster/.gitignore @@ -1,4 +1,5 @@ cfg config.json hpfiles -node_modules \ No newline at end of file +node_modules +*.log \ No newline at end of file diff --git a/test/vm-cluster/stream.js b/test/vm-cluster/stream.js index 05327274..cfd56465 100644 --- a/test/vm-cluster/stream.js +++ b/test/vm-cluster/stream.js @@ -1,6 +1,6 @@ const HotPocket = require('../../examples/js_client/lib/hp-client-lib'); const azure = require('azure-storage'); -const fs = require('fs'); +const fs = require('fs').promises; const https = require('https'); const fetch = require('node-fetch'); @@ -10,6 +10,8 @@ const metricsTrackInterval = process.env.METRICSTRACK || 10000; const backoffDelayMax = process.env.BACKOFFMAX || 60000; const eventsBatchSize = process.env.EVENTBATCH || 20; const stateBatchSize = process.env.STATEBATCH || 20; +const synclog = process.env.SYNCLOG || "off"; +const healthlog = process.env.HEALTHLOG || "off"; let keys = null; let vultrApiKey = null; @@ -27,7 +29,7 @@ async function main() { console.log('My public key is: ' + pkhex); // Load cluster config. - const config = JSON.parse(fs.readFileSync("config.json")); + const config = JSON.parse(await fs.readFile("config.json")); vultrApiKey = config.vultr.api_key; // Create Azure table service. @@ -200,13 +202,37 @@ async function establishClientConnection(node) { reportEvent(node, ev); }); + // This will get fired when any diagnostic health event occurs. + if (healthlog === "on") { + hpc.on(HotPocket.events.healthEvent, async (ev) => { + + const now = new Date().toUTCString(); + if (ev.event === "proposal") { + delete ev.event; + const str = JSON.stringify(ev); + await fs.appendFile("prop_health.log", `${now}, Node${node.idx}, ${node.uri}, ${node.status}, ${str}\n`); + } + else if (ev.event === "connectivity") { + delete ev.event; + const str = JSON.stringify(ev); + await fs.appendFile("conn_health.log", `${now}, Node${node.idx}, ${node.uri}, ${node.status}, ${str}\n`); + } + }); + + await hpc.subscribe(HotPocket.notificationChannels.healthEvent); + } + // Establish HotPocket connection. if (!await hpc.connect()) { onConnectionFail(node); } else { + + const stat = await hpc.getStatus(); + const lastLedger = await hpc.getLedgerBySeqNo(stat.ledgerSeqNo); + node.failureCount = 0; - reportEvent(node, { event: "online" }); + reportEvent(node, { event: "online", ledger: lastLedger }); await hpc.subscribe(HotPocket.notificationChannels.ledgerEvent); } } @@ -248,9 +274,13 @@ async function reportEvent(node, ev) { } else if (ev.event == 'sync_status') { node.status = ev.inSync ? 'in_sync' : 'desync'; + + if (synclog == "on") + await fs.appendFile("sync_ops.log", `${new Date(ts).toUTCString()}, Node${node.idx}, ${node.uri}, ${node.status}, at ${node.lastLedger.seqNo}\n`); } else if (ev.event == 'online') { node.status = 'online'; + node.lastLedger = ev.ledger; } else if (ev.event == 'offline') { node.status = 'offline'; diff --git a/test/vm-cluster/vultr.sh b/test/vm-cluster/vultr.sh index 68e1c506..56fbeba0 100755 --- a/test/vm-cluster/vultr.sh +++ b/test/vm-cluster/vultr.sh @@ -11,7 +11,7 @@ planid="vc2-1c-1gb" # $5/month osid=387 # Ubuntu 20.04 # Order of Vultr regions to distribute servers across the globe. -regions=("nrt" "syd" "fra" "yto" "icn" "cdg" "atl" "sgp" "lhr" "ord" "ams" "nrt" "dfw" "syd" "fra" "lax" "icn" "syd" "cdg" "mia" "sgp" "syd" "lhr" "ewr" "nrt" "syd" "fra" "sea" "icn" "syd" "cdg" "sjc") +regions=("syd" "yto" "ams" "atl" "cdg" "dfw" "ewr" "fra" "icn" "lax" "lhr" "mia" "nrt" "ord" "sea" "sgp" "sjc") # jq command is used for json manipulation. if ! command -v jq &> /dev/null