From cc11ebd7b3d49efdd65d966dea546344857e89b9 Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Mon, 7 Jun 2021 10:24:58 +0530 Subject: [PATCH] Event subscription infrastructure and ledger events. (#319) - Introduced event subscription infrastructure for users. - Added ability to subscribe to ledger events. - Updated client lib for event subscription support. --- examples/js_client/browser-example.html | 20 ++- examples/js_client/lib/hp-client-lib.js | 165 +++++++++++++------ examples/js_client/text-client.js | 20 ++- src/consensus.cpp | 9 +- src/msg/bson/usrmsg_bson.cpp | 142 ++++++++++++++--- src/msg/bson/usrmsg_bson.hpp | 11 +- src/msg/json/usrmsg_json.cpp | 204 ++++++++++++++++++------ src/msg/json/usrmsg_json.hpp | 11 +- src/msg/usrmsg_common.hpp | 12 ++ src/msg/usrmsg_parser.cpp | 30 +++- src/msg/usrmsg_parser.hpp | 12 +- src/status.cpp | 23 ++- src/status.hpp | 13 +- src/usr/user_common.hpp | 15 ++ src/usr/usr.cpp | 57 +++++-- src/usr/usr.hpp | 7 + 16 files changed, 599 insertions(+), 152 deletions(-) create mode 100644 src/usr/user_common.hpp diff --git a/examples/js_client/browser-example.html b/examples/js_client/browser-example.html index fb127588..c035bfff 100644 --- a/examples/js_client/browser-example.html +++ b/examples/js_client/browser-example.html @@ -52,11 +52,6 @@ console.log(server + " " + action); }) - // This will get when the unl public key list changes. - hpc.on(HotPocket.events.unlChange, (unl) => { - console.log("New unl received: " + JSON.stringify(unl)); // unl is an array of hex public keys. - }) - // This will get fired when contract sends an output. hpc.on(HotPocket.events.contractOutput, (r) => { r.outputs.forEach(o => console.log(`Output (ledger:${r.ledgerSeqNo})>> ${o}`)); @@ -70,6 +65,17 @@ console.log("Read response>> " + response); }) + // This will get fired when the unl public key list changes. + hpc.on(HotPocket.events.unlChange, (unl) => { + console.log("New unl received:"); + console.log(unl); // unl is an array of public keys. + }) + + // This will get fired when any ledger event occurs (ledger created, sync status change). + hpc.on(HotPocket.events.ledgerEvent, (ev) => { + console.log(ev); + }) + // Establish HotPocket connection. if (!await hpc.connect()) { console.log('Connection failed.'); @@ -77,6 +83,10 @@ } console.log('HotPocket Connected.'); + // After connecting, we can subscribe to events from the HotPocket node. + // await hpc.subscribe(HotPocket.notificationChannels.unlChange); + // await hpc.subscribe(HotPocket.notificationChannels.ledgerEvent); + hpc.sendContractReadRequest("Hello"); const input = await hpc.submitContractInput("World!") diff --git a/examples/js_client/lib/hp-client-lib.js b/examples/js_client/lib/hp-client-lib.js index ec9a779f..ec9d07b4 100644 --- a/examples/js_client/lib/hp-client-lib.js +++ b/examples/js_client/lib/hp-client-lib.js @@ -47,13 +47,21 @@ /*--- Included in public interface. ---*/ const events = { disconnect: "disconnect", - contractOutput: "contractOutput", - contractReadResponse: "contractReadResponse", - connectionChange: "connectionChange", - unlChange: "unlChange" + contractOutput: "contract_output", + contractReadResponse: "contract_read_response", + connectionChange: "connection_change", + unlChange: "unl_change", + ledgerEvent: "ledger_event" } Object.freeze(events); + /*--- Included in public interface. ---*/ + const notificationChannels = { + unlChange: "unl_change", + ledgerEvent: "ledger_event" + } + Object.freeze(notificationChannels); + /*--- Included in public interface. ---*/ // privateKeyHex: Hex private key with prefix ('ed'). // Returns 'ed' (237) prefixed binary public/private keys. @@ -143,6 +151,8 @@ if (key.length > 0) trustedKeysLookup[key] = true }); + // If there are no keys specified, mark the lookup as null, indicating that we are not intersted in + // checking for trusted servers. if (Object.keys(trustedKeysLookup).length == 0) trustedKeysLookup = null; @@ -153,15 +163,9 @@ let emitter = new EventEmitter(); - // The accessor function passed into connections to query latest trusted key list. - // We update the returning key list whenever we get a unl update. + // The accessor functions passed into connections to query and set latest trusted key list. const getTrustedKeys = () => trustedKeysLookup; - - // Whenever unl change is reported, update the trusted key list. - emitter.on(events.unlChange, (unl) => { - trustedKeysLookup = {}; - unl.sort().forEach(pubkey => trustedKeysLookup[pubkey] = true); - }) + const setTrustedKeys = (newKeys) => (trustedKeysLookup = newKeys); const nodes = Object.keys(serversLookup).map(s => { return { @@ -171,6 +175,12 @@ } }); + // Default subscriptions. + const subscriptions = {}; + // Subscribe for unl changes if we have to maintain the trusted server key checks. + subscriptions[notificationChannels.unlChange] = trustedKeysLookup ? true : false; + subscriptions[notificationChannels.ledgerEvent] = false; + let status = 0; //0:none, 1:connected, 2:closed // This will get fired whenever the required connection count gets fullfilled. @@ -241,14 +251,24 @@ // Get the next available node. const n = freeNodes.shift(); - n.connection = new HotPocketConnection(contractId, contractVersion, clientKeys, n.server, getTrustedKeys, protocol, connectionTimeoutMs, emitter); + n.connection = new HotPocketConnection( + contractId, contractVersion, clientKeys, n.server, + getTrustedKeys, setTrustedKeys, protocol, connectionTimeoutMs, emitter); n.lastActivity = new Date().getTime(); n.connection.connect().then(success => { - if (!success) + if (!success) { n.connection = null; - else + } + else { emitter && emitter.emit(events.connectionChange, n.server, "add"); + + // Issue subscription request for any subscriptions we have to maintain. + // We don't wait for completion because we just need to issue the request to the server. + for (const [channel, enabled] of Object.entries(subscriptions)) { + if (enabled) n.connection.subscribe(channel) + } + } }); n.connection.onClose = () => { @@ -360,12 +380,24 @@ return getMultiConnectionResult(con => con.getLcl()); } + this.subscribe = (channel) => { + subscriptions[channel] = true; + return executeMultiConnectionFunc(con => con.subscribe(channel)); + } + + this.unsubscribe = (channel) => { + subscriptions[channel] = false; + return executeMultiConnectionFunc(con => con.unsubscribe(channel)); + } + this.getLedgerBySeqNo = (seqNo, includeInputs, includeOutputs) => { return getMultiConnectionResult(con => con.getLedgerBySeqNo(seqNo, includeInputs, includeOutputs)); } } - function HotPocketConnection(contractId, contractVersion, clientKeys, server, getTrustedKeys, protocol, connectionTimeoutMs, emitter) { + function HotPocketConnection( + contractId, contractVersion, clientKeys, server, + getTrustedKeys, setTrustedKeys, protocol, connectionTimeoutMs, emitter) { // Create message helper with JSON protocol initially. // After challenge handshake, we will change this to use the protocol specified by user. @@ -491,18 +523,24 @@ return false; } - const validateAndEmitUnlChange = (changedUnl) => { - // If this is currently a trusted connection, notify unl update. - const trustedKeys = getTrustedKeys(); - if (trustedKeys && trustedKeys[pubkey]) { - // Prepare sorted new unl lookup object for equality comparison. - const newUnl = {}; - changedUnl.sort().forEach(k => newUnl[k] = true); + const processUnlUpdate = (unl, isHandshake) => { - // Only emit unl change event if the unl has really changed. - if (JSON.stringify(trustedKeys) != JSON.stringify(newUnl)) - emitter && emitter.emit(events.unlChange, changedUnl); + unl = unl.map(k => msgHelper.deserializeValue(k)).sort(); + + // If this is currently a trusted connection, update the trusted key set with the received unl. + let trustedKeys = getTrustedKeys(); + if (trustedKeys && trustedKeys[pubkey]) { + trustedKeys = {}; // reset the object and reinitialize the list. + + // Convert unl pubkeys to hex string so we can use them as lookup object keys. + const hexUnl = unl.map(k => msgHelper.stringifyValue(k)); + hexUnl.forEach(k => trustedKeys[k] = true); + setTrustedKeys(trustedKeys); + liblog(0, "Updated trusted keys."); } + + if (!isHandshake) + emitter && emitter.emit(events.unlChange, unl); } const handshakeMessageHandler = (m) => { @@ -556,17 +594,17 @@ clearTimeout(handshakeTimer); // Cancel the handshake timeout monitor. handshakeTimer = null; serverChallenge = null; // Clear the sent challenge as we no longer need it. - msgHelper.useProtocol(protocol); // Here onwards, use the message protocol specified by user. pubkey = m.pubkey; // Set this connection's public key. connectionStatus = 2; // Handshake complete. + processUnlUpdate(m.unl, true); + msgHelper.useProtocol(protocol); // Here onwards, use the message protocol specified by user. + // If we are still connected, report handshaking as successful. // (If websocket disconnects, handshakeResolver will be already null) handshakeResolver && handshakeResolver(true); liblog(0, `Connected to ${server}`); - validateAndEmitUnlChange(m.unl); - return true; } @@ -621,6 +659,7 @@ hpVersion: m.hp_version, ledgerSeqNo: m.ledger_seq_no, ledgerHash: msgHelper.deserializeValue(m.ledger_hash), + inSync: m.in_sync, roundTime: m.round_time, contractExecutionEnabled: m.contract_execution_enabled, readRequestsEnabled: m.read_requests_enabled, @@ -641,27 +680,21 @@ lclResponseResolvers = []; } else if (m.type == "unl_change") { - if (m.unl) { - // Convert unl pubkeys to hex string. - let unl = m.unl.map(k => msgHelper.stringifyValue(k)); - validateAndEmitUnlChange(unl); - } + processUnlUpdate(m.unl, false); + } + else if (m.type == "ledger_event") { + const ev = { event: m.event }; + if (ev.event == "ledger_created") + ev.ledger = msgHelper.deserializeLedger(m.ledger); + else if (ev.event == "sync_status") + ev.inSync = m.in_sync; + emitter.emit(events.ledgerEvent, ev); } else if (m.type == "ledger_query_result") { const resolver = ledgerQueryResolvers[m.reply_for]; if (resolver) { const results = m.results.map(r => { - const result = { - seqNo: r.seq_no, - timestamp: r.timestamp, - hash: msgHelper.deserializeValue(r.hash), - prevHash: msgHelper.deserializeValue(r.prev_hash), - stateHash: msgHelper.deserializeValue(r.state_hash), - configHash: msgHelper.deserializeValue(r.config_hash), - userHash: msgHelper.deserializeValue(r.user_hash), - inputHash: msgHelper.deserializeValue(r.input_hash), - outputHash: msgHelper.deserializeValue(r.output_hash) - } + const result = msgHelper.deserializeLedger(r); if (r.inputs) { result.inputs = r.inputs.map(i => { @@ -912,6 +945,24 @@ return Promise.resolve(); } + this.subscribe = (channel) => { + if (connectionStatus != 2) + return Promise.resolve(); + + const msg = msgHelper.createSubscriptionRequest(channel, true); + wsSend(msgHelper.serializeObject(msg)); + return Promise.resolve(); + } + + this.unsubscribe = (channel) => { + if (connectionStatus != 2) + return Promise.resolve(); + + const msg = msgHelper.createSubscriptionRequest(channel, false); + wsSend(msgHelper.serializeObject(msg)); + return Promise.resolve(); + } + this.getLedgerBySeqNo = (seqNo, includeInputs, includeOutputs) => { if (connectionStatus != 2) return Promise.resolve(null); @@ -1054,6 +1105,14 @@ return { type: "lcl" }; } + this.createSubscriptionRequest = (channel, enabled) => { + return { + type: "subscription", + channel: channel, + enabled: enabled + } + } + this.createLedgerQuery = (filterBy, params, includeInputs, includeOutputs) => { const includes = []; @@ -1068,6 +1127,20 @@ include: includes } } + + this.deserializeLedger = (l) => { + return { + seqNo: l.seq_no, + timestamp: l.timestamp, + hash: this.deserializeValue(l.hash), + prevHash: this.deserializeValue(l.prev_hash), + stateHash: this.deserializeValue(l.state_hash), + configHash: this.deserializeValue(l.config_hash), + userHash: this.deserializeValue(l.user_hash), + inputHash: this.deserializeValue(l.input_hash), + outputHash: this.deserializeValue(l.output_hash) + } + } } function hexToUint8Array(hexString) { @@ -1245,6 +1318,7 @@ generateKeys, createClient, events, + notificationChannels, protocols, setLogLevel }; @@ -1254,6 +1328,7 @@ generateKeys, createClient, events, + notificationChannels, protocols, setLogLevel }; diff --git a/examples/js_client/text-client.js b/examples/js_client/text-client.js index 0e3fdc01..09431412 100644 --- a/examples/js_client/text-client.js +++ b/examples/js_client/text-client.js @@ -52,11 +52,6 @@ async function main() { console.log(server + " " + action); }) - // This will get when the unl public key list changes. - hpc.on(HotPocket.events.unlChange, (unl) => { - console.log("New unl received: " + JSON.stringify(unl)); // unl is an array of hex public keys. - }) - // This will get fired when contract sends outputs. hpc.on(HotPocket.events.contractOutput, (r) => { r.outputs.forEach(o => console.log(`Output (ledger:${r.ledgerSeqNo})>> ${o}`)); @@ -67,6 +62,17 @@ async function main() { console.log("Read response>> " + response); }) + // This will get fired when the unl public key list changes. + hpc.on(HotPocket.events.unlChange, (unl) => { + console.log("New unl received:"); + console.log(unl); // unl is an array of public keys. + }) + + // This will get fired when any ledger event occurs (ledger created, sync status change). + hpc.on(HotPocket.events.ledgerEvent, (ev) => { + console.log(ev); + }) + // Establish HotPocket connection. if (!await hpc.connect()) { console.log('Connection failed.'); @@ -74,6 +80,10 @@ async function main() { } console.log('HotPocket Connected.'); + // After connecting, we can subscribe to events from the HotPocket node. + // await hpc.subscribe(HotPocket.notificationChannels.unlChange); + // await hpc.subscribe(HotPocket.notificationChannels.ledgerEvent); + // start listening for stdin const rl = readline.createInterface({ input: process.stdin, diff --git a/src/consensus.cpp b/src/consensus.cpp index 5b37bb8b..b3018591 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -157,9 +157,10 @@ namespace consensus new_sync_status = check_sync_status(unl_count, votes, lcl_id); } - // Update the status if the sync status changed. - if ((ctx.sync_status != 0 && new_sync_status == 0) || (ctx.sync_status == 0 && new_sync_status != 0)) - status::sync_status_changed(new_sync_status == 0); + // Update the sync status if we went from in-sync to not-in-sync. We will report back as being in-sync + // only when we hit stage 3. + if (ctx.sync_status == 0 && new_sync_status != 0) + status::sync_status_changed(false); ctx.sync_status = new_sync_status; } @@ -187,6 +188,8 @@ namespace consensus // Upon successful consensus at stage 3, update the ledger and execute the contract using the consensus proposal. if (ctx.stage == 3) { + status::sync_status_changed(true); // Creating a new ledger means we are in sync. + consensed_user_map consensed_users; if (prepare_consensed_users(consensed_users, p) == -1 || commit_consensus_results(p, consensed_users, patch_hash) == -1) diff --git a/src/msg/bson/usrmsg_bson.cpp b/src/msg/bson/usrmsg_bson.cpp index b9ac3c23..f053ae55 100644 --- a/src/msg/bson/usrmsg_bson.cpp +++ b/src/msg/bson/usrmsg_bson.cpp @@ -32,6 +32,7 @@ namespace msg::usrmsg::bson { const util::sequence_hash lcl_id = status::get_lcl_id(); const std::set unl = status::get_unl(); + const bool in_sync = status::is_in_sync(); jsoncons::bson::bson_bytes_encoder encoder(msg); encoder.begin_object(); @@ -43,6 +44,8 @@ namespace msg::usrmsg::bson encoder.int64_value(lcl_id.seq_no); encoder.key(msg::usrmsg::FLD_LEDGER_HASH); encoder.byte_string_value(lcl_id.hash.to_string_view()); + encoder.key(msg::usrmsg::FLD_IN_SYNC); + encoder.bool_value(in_sync); encoder.key(msg::usrmsg::FLD_ROUND_TIME); encoder.uint64_value(conf::cfg.contract.roundtime); encoder.key(msg::usrmsg::FLD_CONTARCT_EXECUTION_ENABLED); @@ -236,7 +239,7 @@ namespace msg::usrmsg::bson } /** - * Constructs unl list container message. + * Constructs unl change notification message. * @param msg Buffer to construct the generated bson message string into. * Message format: * { @@ -245,7 +248,7 @@ namespace msg::usrmsg::bson * } * @param unl_list The unl node pubkey list to be put in the message. */ - void create_unl_list_container(std::vector &msg, const ::std::set &unl_list) + void create_unl_notification(std::vector &msg, const ::std::set &unl_list) { jsoncons::bson::bson_bytes_encoder encoder(msg); encoder.begin_object(); @@ -260,9 +263,61 @@ namespace msg::usrmsg::bson encoder.flush(); } + /** + * Constructs ledger created notification message. + * @param msg Buffer to construct the generated bson message string into. + * Message format: + * { + * "type": "ledger_event", + * "event": "ledger_created", + * "ledger": { ... } + * } + * @param ledger The created ledger. + */ + void create_ledger_created_notification(std::vector &msg, const ledger::ledger_record &ledger) + { + jsoncons::bson::bson_bytes_encoder encoder(msg); + encoder.begin_object(); + encoder.key(msg::usrmsg::FLD_TYPE); + encoder.string_value(msg::usrmsg::MSGTYPE_LEDGER_EVENT); + encoder.key(msg::usrmsg::FLD_EVENT); + encoder.string_value(msg::usrmsg::LEDGER_EVENT_LEDGER_CREATED); + encoder.key(msg::usrmsg::FLD_LEDGER); + encoder.begin_object(); + populate_ledger_fields(encoder, ledger); + encoder.end_object(); + encoder.end_object(); + encoder.flush(); + } + + /** + * Constructs sync status notification message. + * @param msg Buffer to construct the generated bson message string into. + * Message format: + * { + * "type": "ledger_event", + * "event": "sync_status", + * "in_sync": true | false + * } + * @param in_sync Whether the node is in sync or not. + */ + void create_sync_status_notification(std::vector &msg, const bool in_sync) + { + jsoncons::bson::bson_bytes_encoder encoder(msg); + encoder.begin_object(); + encoder.key(msg::usrmsg::FLD_TYPE); + encoder.string_value(msg::usrmsg::MSGTYPE_LEDGER_EVENT); + encoder.key(msg::usrmsg::FLD_EVENT); + encoder.string_value(msg::usrmsg::LEDGER_EVENT_SYNC_STATUS); + encoder.key(msg::usrmsg::FLD_IN_SYNC); + encoder.bool_value(in_sync); + encoder.end_object(); + encoder.flush(); + } + /** * Constructs a ledger query response. - * @param msg Buffer to construct the generated json message string into. + * @param msg Buffer to construct the generated bson message string into. * Message format: * { * "type": "ledger_query_result", @@ -448,6 +503,46 @@ namespace msg::usrmsg::bson return 0; } + /** + * Extract ledger event subscription request. + * @param channel Extracted subscription channel. + * @param enabled Whether the subscription is enabled or not. + * @param d The json document holding the subscription request. + * Accepted message format: + * { + * "type": "subscription", + * "channel": "unl_change" | "ledger_event", + * "enabled": true | false + * } + * @return 0 on successful extraction. -1 for failure. + */ + int extract_subscription_request(usr::NOTIFICATION_CHANNEL &channel, bool &enabled, const jsoncons::ojson &d) + { + if (!d.contains(msg::usrmsg::FLD_CHANNEL) || !d.contains(msg::usrmsg::FLD_ENABLED) || + !d[msg::usrmsg::FLD_CHANNEL].is() || !d[msg::usrmsg::FLD_ENABLED].is()) + { + LOG_DEBUG << "User subscription request required fields missing or invalid."; + return -1; + } + + if (d[msg::usrmsg::FLD_CHANNEL] == msg::usrmsg::MSGTYPE_LEDGER_EVENT) + { + channel = usr::NOTIFICATION_CHANNEL::LEDGER_EVENT; + } + else if (d[msg::usrmsg::FLD_CHANNEL] == msg::usrmsg::MSGTYPE_UNL_CHANGE) + { + channel = usr::NOTIFICATION_CHANNEL::UNL_CHANGE; + } + else + { + LOG_DEBUG << "User subscription request invalid channel."; + return -1; + } + + enabled = d[msg::usrmsg::FLD_ENABLED].as(); + return 0; + } + /** * Extract query information from a ledger query request. * @param extracted_query Extracted query criteria. @@ -547,24 +642,7 @@ namespace msg::usrmsg::bson for (const ledger::ledger_record &ledger : results) { encoder.begin_object(); - encoder.key(msg::usrmsg::FLD_SEQ_NO); - encoder.uint64_value(ledger.seq_no); - encoder.key(msg::usrmsg::FLD_TIMESTAMP); - encoder.uint64_value(ledger.timestamp); - encoder.key(msg::usrmsg::FLD_HASH); - encoder.byte_string_value(ledger.ledger_hash); - encoder.key(msg::usrmsg::FLD_PREV_HASH); - encoder.byte_string_value(ledger.prev_ledger_hash); - encoder.key(msg::usrmsg::FLD_STATE_HASH); - encoder.byte_string_value(ledger.state_hash); - encoder.key(msg::usrmsg::FLD_CONFIG_HASH); - encoder.byte_string_value(ledger.config_hash); - encoder.key(msg::usrmsg::FLD_USER_HASH); - encoder.byte_string_value(ledger.user_hash); - encoder.key(msg::usrmsg::FLD_INPUT_HASH); - encoder.byte_string_value(ledger.input_hash); - encoder.key(msg::usrmsg::FLD_OUTPUT_HASH); - encoder.byte_string_value(ledger.output_hash); + populate_ledger_fields(encoder, ledger); // If raw inputs or outputs is not requested, we don't include that field at all in the response. // Otherwise the field will always contain an array (empty array if no data). @@ -585,6 +663,28 @@ namespace msg::usrmsg::bson } } + void populate_ledger_fields(jsoncons::bson::bson_bytes_encoder &encoder, const ledger::ledger_record &ledger) + { + encoder.key(msg::usrmsg::FLD_SEQ_NO); + encoder.uint64_value(ledger.seq_no); + encoder.key(msg::usrmsg::FLD_TIMESTAMP); + encoder.uint64_value(ledger.timestamp); + encoder.key(msg::usrmsg::FLD_HASH); + encoder.byte_string_value(ledger.ledger_hash); + encoder.key(msg::usrmsg::FLD_PREV_HASH); + encoder.byte_string_value(ledger.prev_ledger_hash); + encoder.key(msg::usrmsg::FLD_STATE_HASH); + encoder.byte_string_value(ledger.state_hash); + encoder.key(msg::usrmsg::FLD_CONFIG_HASH); + encoder.byte_string_value(ledger.config_hash); + encoder.key(msg::usrmsg::FLD_USER_HASH); + encoder.byte_string_value(ledger.user_hash); + encoder.key(msg::usrmsg::FLD_INPUT_HASH); + encoder.byte_string_value(ledger.input_hash); + encoder.key(msg::usrmsg::FLD_OUTPUT_HASH); + encoder.byte_string_value(ledger.output_hash); + } + void populate_ledger_inputs(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector &inputs) { encoder.begin_array(); diff --git a/src/msg/bson/usrmsg_bson.hpp b/src/msg/bson/usrmsg_bson.hpp index 29a18167..64f74bef 100644 --- a/src/msg/bson/usrmsg_bson.hpp +++ b/src/msg/bson/usrmsg_bson.hpp @@ -4,6 +4,7 @@ #include "../../pchheader.hpp" #include "../../util/merkle_hash_tree.hpp" #include "../../ledger/ledger_query.hpp" +#include "../../usr/user_common.hpp" namespace msg::usrmsg::bson { @@ -21,7 +22,11 @@ namespace msg::usrmsg::bson const util::merkle_hash_node &hash_root, const std::vector> &unl_sig, const uint64_t lcl_seq_no, std::string_view lcl_hash); - void create_unl_list_container(std::vector &msg, const ::std::set &unl_list); + void create_unl_notification(std::vector &msg, const ::std::set &unl_list); + + void create_ledger_created_notification(std::vector &msg, const ledger::ledger_record &ledger); + + void create_sync_status_notification(std::vector &msg, const bool in_sync); void create_ledger_query_response(std::vector &msg, std::string_view reply_for, const ledger::query::query_result &result); @@ -41,12 +46,16 @@ namespace msg::usrmsg::bson int extract_input_container(std::string &input, uint64_t &nonce, uint64_t &max_ledger_seq_no, std::string_view contentbson); + int extract_subscription_request(usr::NOTIFICATION_CHANNEL &channel, bool &enabled, const jsoncons::ojson &d); + int extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id, const jsoncons::ojson &d); void populate_output_hash_array(jsoncons::bson::bson_bytes_encoder &encoder, const util::merkle_hash_node &node); void populate_ledger_query_results(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector &results); + void populate_ledger_fields(jsoncons::bson::bson_bytes_encoder &encoder, const ledger::ledger_record &ledger); + void populate_ledger_inputs(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector &inputs); void populate_ledger_outputs(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector &users); diff --git a/src/msg/json/usrmsg_json.cpp b/src/msg/json/usrmsg_json.cpp index c881f87a..5f9656d7 100644 --- a/src/msg/json/usrmsg_json.cpp +++ b/src/msg/json/usrmsg_json.cpp @@ -151,10 +151,9 @@ namespace msg::usrmsg::json { const util::sequence_hash lcl_id = status::get_lcl_id(); const std::set unl = status::get_unl(); + const bool in_sync = status::is_in_sync(); - const uint16_t msg_length = 406 + (69 * unl.size()); - - msg.reserve(msg_length); + msg.reserve(1024); msg += "{\""; msg += msg::usrmsg::FLD_TYPE; msg += SEP_COLON; @@ -172,21 +171,25 @@ namespace msg::usrmsg::json msg += SEP_COLON; msg += util::to_hex(lcl_id.hash.to_string_view()); msg += SEP_COMMA; + msg += msg::usrmsg::FLD_IN_SYNC; + msg += SEP_COLON_NOQUOTE; + msg += in_sync ? STR_TRUE : STR_FALSE; + msg += SEP_COMMA_NOQUOTE; msg += msg::usrmsg::FLD_ROUND_TIME; msg += SEP_COLON_NOQUOTE; msg += std::to_string(conf::cfg.contract.roundtime); msg += SEP_COMMA_NOQUOTE; msg += msg::usrmsg::FLD_CONTARCT_EXECUTION_ENABLED; msg += SEP_COLON_NOQUOTE; - msg += conf::cfg.contract.execute ? "true" : "false"; + msg += conf::cfg.contract.execute ? STR_TRUE : STR_FALSE; msg += SEP_COMMA_NOQUOTE; msg += msg::usrmsg::FLD_READ_REQUESTS_ENABLED; msg += SEP_COLON_NOQUOTE; - msg += conf::cfg.user.concurrent_read_reqeuests != 0 ? "true" : "false"; + msg += conf::cfg.user.concurrent_read_reqeuests != 0 ? STR_TRUE : STR_FALSE; msg += SEP_COMMA_NOQUOTE; msg += msg::usrmsg::FLD_IS_FULL_HISTORY_NODE; msg += SEP_COLON_NOQUOTE; - msg += conf::cfg.node.history == conf::HISTORY::FULL ? "true" : "false"; + msg += conf::cfg.node.history == conf::HISTORY::FULL ? STR_TRUE : STR_FALSE; msg += SEP_COMMA_NOQUOTE; msg += msg::usrmsg::FLD_CURRENT_UNL; msg += SEP_COLON_NOQUOTE; @@ -451,7 +454,7 @@ namespace msg::usrmsg::json } /** - * Constructs unl list container message. + * Constructs unl change notification message. * @param msg Buffer to construct the generated json message string into. * Message format: * { @@ -460,7 +463,7 @@ namespace msg::usrmsg::json * } * @param unl_list The unl node pubkey list to be put in the message. */ - void create_unl_list_container(std::vector &msg, const ::std::set &unl_list) + void create_unl_notification(std::vector &msg, const ::std::set &unl_list) { msg.reserve((69 * unl_list.size()) + 30); msg += "{\""; @@ -485,6 +488,64 @@ namespace msg::usrmsg::json msg += "]}"; } + /** + * Constructs ledger created notification message. + * @param msg Buffer to construct the generated json message string into. + * Message format: + * { + * "type": "ledger_event", + * "event": "ledger_created", + * "ledger": { ... } + * } + * @param ledger The created ledger. + */ + void create_ledger_created_notification(std::vector &msg, const ledger::ledger_record &ledger) + { + msg.reserve(1024); + msg += "{\""; + msg += msg::usrmsg::FLD_TYPE; + msg += SEP_COLON; + msg += msg::usrmsg::MSGTYPE_LEDGER_EVENT; + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_EVENT; + msg += SEP_COLON; + msg += msg::usrmsg::LEDGER_EVENT_LEDGER_CREATED; + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_LEDGER; + msg += "\":{"; + populate_ledger_fields(msg, ledger); + msg += "}}"; + } + + /** + * Constructs sync status notification message. + * @param msg Buffer to construct the generated json message string into. + * Message format: + * { + * "type": "ledger_event", + * "event": "sync_status", + * "in_sync": true | false + * } + * @param in_sync Whether the node is in sync or not. + */ + void create_sync_status_notification(std::vector &msg, const bool in_sync) + { + msg.reserve(128); + msg += "{\""; + msg += msg::usrmsg::FLD_TYPE; + msg += SEP_COLON; + msg += msg::usrmsg::MSGTYPE_LEDGER_EVENT; + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_EVENT; + msg += SEP_COLON; + msg += msg::usrmsg::LEDGER_EVENT_SYNC_STATUS; + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_IN_SYNC; + msg += SEP_COLON_NOQUOTE; + msg += in_sync ? STR_TRUE : STR_FALSE; + msg += "}"; + } + /** * Constructs a ledger query response. * @param msg Buffer to construct the generated json message string into. @@ -796,6 +857,51 @@ namespace msg::usrmsg::json return 0; } + /** + * Extract ledger event subscription request. + * @param channel Extracted subscription channel. + * @param enabled Whether the subscription is enabled or not. + * @param d The json document holding the subscription request. + * Accepted message format: + * { + * "type": "subscription", + * "channel": "unl_change" | "ledger_event", + * "enabled": true | false + * } + * @return 0 on successful extraction. -1 for failure. + */ + int extract_subscription_request(usr::NOTIFICATION_CHANNEL &channel, bool &enabled, const jsoncons::json &d) + { + if (!d.contains(msg::usrmsg::FLD_CHANNEL) || !d.contains(msg::usrmsg::FLD_ENABLED)) + { + LOG_DEBUG << "User subscription request required fields missing."; + return -1; + } + + if (!d[msg::usrmsg::FLD_CHANNEL].is() || !d[msg::usrmsg::FLD_ENABLED].is()) + { + LOG_DEBUG << "User subscription request invalid field values."; + return -1; + } + + if (d[msg::usrmsg::FLD_CHANNEL] == msg::usrmsg::MSGTYPE_LEDGER_EVENT) + { + channel = usr::NOTIFICATION_CHANNEL::LEDGER_EVENT; + } + else if (d[msg::usrmsg::FLD_CHANNEL] == msg::usrmsg::MSGTYPE_UNL_CHANGE) + { + channel = usr::NOTIFICATION_CHANNEL::UNL_CHANGE; + } + else + { + LOG_DEBUG << "User subscription request invalid channel."; + return -1; + } + + enabled = d[msg::usrmsg::FLD_ENABLED].as(); + return 0; + } + /** * Extract query information from a ledger query request. * @param extracted_query Extracted query criteria. @@ -880,7 +986,7 @@ namespace msg::usrmsg::json if ((first == '\"' && last == '\"') || (first == '{' && last == '}') || (first == '[' && last == ']') || - content == "true" || content == "false") + content == STR_TRUE || content == STR_FALSE) return false; // Check whether all characters are digits. @@ -933,43 +1039,8 @@ namespace msg::usrmsg::json { const ledger::ledger_record &ledger = results[i]; - msg += "{\""; - msg += msg::usrmsg::FLD_SEQ_NO; - msg += SEP_COLON_NOQUOTE; - msg += std::to_string(ledger.seq_no); - msg += SEP_COMMA_NOQUOTE; - msg += msg::usrmsg::FLD_TIMESTAMP; - msg += SEP_COLON_NOQUOTE; - msg += std::to_string(ledger.timestamp); - msg += SEP_COMMA_NOQUOTE; - msg += msg::usrmsg::FLD_HASH; - msg += SEP_COLON; - msg += util::to_hex(ledger.ledger_hash); - msg += SEP_COMMA; - msg += msg::usrmsg::FLD_PREV_HASH; - msg += SEP_COLON; - msg += util::to_hex(ledger.prev_ledger_hash); - msg += SEP_COMMA; - msg += msg::usrmsg::FLD_STATE_HASH; - msg += SEP_COLON; - msg += util::to_hex(ledger.state_hash); - msg += SEP_COMMA; - msg += msg::usrmsg::FLD_CONFIG_HASH; - msg += SEP_COLON; - msg += util::to_hex(ledger.config_hash); - msg += SEP_COMMA; - msg += msg::usrmsg::FLD_USER_HASH; - msg += SEP_COLON; - msg += util::to_hex(ledger.user_hash); - msg += SEP_COMMA; - msg += msg::usrmsg::FLD_INPUT_HASH; - msg += SEP_COLON; - msg += util::to_hex(ledger.input_hash); - msg += SEP_COMMA; - msg += msg::usrmsg::FLD_OUTPUT_HASH; - msg += SEP_COLON; - msg += util::to_hex(ledger.output_hash); - msg += "\""; + msg += "{"; + populate_ledger_fields(msg, ledger); // If raw inputs or outputs is not requested, we don't include that field at all in the response. // Otherwise the field will always contain an array (empty array if no data). @@ -994,6 +1065,47 @@ namespace msg::usrmsg::json } } + void populate_ledger_fields(std::vector &msg, const ledger::ledger_record &ledger) + { + msg += "\""; + msg += msg::usrmsg::FLD_SEQ_NO; + msg += SEP_COLON_NOQUOTE; + msg += std::to_string(ledger.seq_no); + msg += SEP_COMMA_NOQUOTE; + msg += msg::usrmsg::FLD_TIMESTAMP; + msg += SEP_COLON_NOQUOTE; + msg += std::to_string(ledger.timestamp); + msg += SEP_COMMA_NOQUOTE; + msg += msg::usrmsg::FLD_HASH; + msg += SEP_COLON; + msg += util::to_hex(ledger.ledger_hash); + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_PREV_HASH; + msg += SEP_COLON; + msg += util::to_hex(ledger.prev_ledger_hash); + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_STATE_HASH; + msg += SEP_COLON; + msg += util::to_hex(ledger.state_hash); + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_CONFIG_HASH; + msg += SEP_COLON; + msg += util::to_hex(ledger.config_hash); + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_USER_HASH; + msg += SEP_COLON; + msg += util::to_hex(ledger.user_hash); + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_INPUT_HASH; + msg += SEP_COLON; + msg += util::to_hex(ledger.input_hash); + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_OUTPUT_HASH; + msg += SEP_COLON; + msg += util::to_hex(ledger.output_hash); + msg += "\""; + } + void populate_ledger_inputs(std::vector &msg, const std::vector &inputs) { msg += "["; diff --git a/src/msg/json/usrmsg_json.hpp b/src/msg/json/usrmsg_json.hpp index 1779a3b8..d06381e3 100644 --- a/src/msg/json/usrmsg_json.hpp +++ b/src/msg/json/usrmsg_json.hpp @@ -4,6 +4,7 @@ #include "../../pchheader.hpp" #include "../../util/merkle_hash_tree.hpp" #include "../../ledger/ledger_query.hpp" +#include "../../usr/user_common.hpp" namespace msg::usrmsg::json { @@ -25,7 +26,11 @@ namespace msg::usrmsg::json const util::merkle_hash_node &hash_root, const std::vector> &unl_sig, const uint64_t lcl_seq_no, std::string_view lcl_hash); - void create_unl_list_container(std::vector &msg, const ::std::set &unl_list); + void create_unl_notification(std::vector &msg, const ::std::set &unl_list); + + void create_ledger_created_notification(std::vector &msg, const ledger::ledger_record &ledger); + + void create_sync_status_notification(std::vector &msg, const bool in_sync); void create_ledger_query_response(std::vector &msg, std::string_view reply_for, const ledger::query::query_result &result); @@ -45,6 +50,8 @@ namespace msg::usrmsg::json int extract_input_container(std::string &input, uint64_t &nonce, uint64_t &max_ledger_seq_no, std::string_view contentjson); + int extract_subscription_request(usr::NOTIFICATION_CHANNEL &channel, bool &enabled, const jsoncons::json &d); + int extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id, const jsoncons::json &d); bool is_json_string(std::string_view content); @@ -53,6 +60,8 @@ namespace msg::usrmsg::json void populate_ledger_query_results(std::vector &msg, const std::vector &results); + void populate_ledger_fields(std::vector &msg, const ledger::ledger_record &ledger); + void populate_ledger_inputs(std::vector &msg, const std::vector &inputs); void populate_ledger_outputs(std::vector &msg, const std::vector &users); diff --git a/src/msg/usrmsg_common.hpp b/src/msg/usrmsg_common.hpp index e83a96f3..0be8d771 100644 --- a/src/msg/usrmsg_common.hpp +++ b/src/msg/usrmsg_common.hpp @@ -43,6 +43,7 @@ namespace msg::usrmsg constexpr const char *FLD_IS_FULL_HISTORY_NODE = "is_full_history_node"; constexpr const char *FLD_CURRENT_UNL = "current_unl"; constexpr const char *FLD_PEERS = "peers"; + constexpr const char *FLD_IN_SYNC = "in_sync"; constexpr const char *FLD_ID = "id"; constexpr const char *FLD_REPLY_FOR = "reply_for"; constexpr const char *FLD_FILTER_BY = "filter_by"; @@ -60,6 +61,10 @@ namespace msg::usrmsg constexpr const char *FLD_INPUTS = "inputs"; constexpr const char *FLD_BLOB = "blob"; constexpr const char *FLD_BLOBS = "blobs"; + constexpr const char *FLD_EVENT = "event"; + constexpr const char *FLD_LEDGER = "ledger"; + constexpr const char *FLD_CHANNEL = "channel"; + constexpr const char *FLD_ENABLED = "enabled"; // Message types constexpr const char *MSGTYPE_USER_CHALLENGE = "user_challenge"; @@ -75,8 +80,10 @@ namespace msg::usrmsg constexpr const char *MSGTYPE_LCL = "lcl"; constexpr const char *MSGTYPE_LCL_RESPONSE = "lcl_response"; constexpr const char *MSGTYPE_UNL_CHANGE = "unl_change"; + constexpr const char *MSGTYPE_LEDGER_EVENT = "ledger_event"; constexpr const char *MSGTYPE_LEDGER_QUERY = "ledger_query"; constexpr const char *MSGTYPE_LEDGER_QUERY_RESULT = "ledger_query_result"; + constexpr const char *MSGTYPE_SUBSCRIPTION = "subscription"; constexpr const char *MSGTYPE_UNKNOWN = "unknown"; // Values @@ -92,6 +99,11 @@ namespace msg::usrmsg constexpr const char *REASON_ALREADY_SUBMITTED = "already_submitted"; constexpr const char *REASON_ROUND_INPUTS_OVERFLOW = "round_inputs_overflow"; constexpr const char *QUERY_FILTER_BY_SEQ_NO = "seq_no"; + constexpr const char *STR_TRUE = "true"; + constexpr const char *STR_FALSE = "false"; + constexpr const char *LEDGER_EVENT_LEDGER_CREATED = "ledger_created"; + constexpr const char *LEDGER_EVENT_SYNC_STATUS = "sync_status"; + } // namespace msg::usrmsg diff --git a/src/msg/usrmsg_parser.cpp b/src/msg/usrmsg_parser.cpp index c45232ae..a42ae6a8 100644 --- a/src/msg/usrmsg_parser.cpp +++ b/src/msg/usrmsg_parser.cpp @@ -56,12 +56,28 @@ namespace msg::usrmsg busrmsg::create_contract_output_container(msg, hash, outputs, hash_root, unl_sig, lcl_seq_no, lcl_hash); } - void usrmsg_parser::create_unl_list_container(std::vector &msg, const ::std::set &unl_list) const + void usrmsg_parser::create_unl_notification(std::vector &msg, const std::set &unl_list) const { if (protocol == util::PROTOCOL::JSON) - jusrmsg::create_unl_list_container(msg, unl_list); + jusrmsg::create_unl_notification(msg, unl_list); else - busrmsg::create_unl_list_container(msg, unl_list); + busrmsg::create_unl_notification(msg, unl_list); + } + + void usrmsg_parser::create_ledger_created_notification(std::vector &msg, const ledger::ledger_record &ledger) const + { + if (protocol == util::PROTOCOL::JSON) + jusrmsg::create_ledger_created_notification(msg, ledger); + else + busrmsg::create_ledger_created_notification(msg, ledger); + } + + void usrmsg_parser::create_sync_status_notification(std::vector &msg, const bool in_sync) const + { + if (protocol == util::PROTOCOL::JSON) + jusrmsg::create_sync_status_notification(msg, in_sync); + else + busrmsg::create_sync_status_notification(msg, in_sync); } void usrmsg_parser::create_ledger_query_response(std::vector &msg, std::string_view reply_for, @@ -114,6 +130,14 @@ namespace msg::usrmsg return busrmsg::extract_input_container(input, nonce, max_ledger_seq_no, encoded_content); } + int usrmsg_parser::extract_subscription_request(usr::NOTIFICATION_CHANNEL &channel, bool &enabled) + { + if (protocol == util::PROTOCOL::JSON) + return jusrmsg::extract_subscription_request(channel, enabled, jdoc); + else + return busrmsg::extract_subscription_request(channel, enabled, bdoc); + } + int usrmsg_parser::extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id) const { if (protocol == util::PROTOCOL::JSON) diff --git a/src/msg/usrmsg_parser.hpp b/src/msg/usrmsg_parser.hpp index 87a84876..0efa11a0 100644 --- a/src/msg/usrmsg_parser.hpp +++ b/src/msg/usrmsg_parser.hpp @@ -5,12 +5,10 @@ #include "../util/util.hpp" #include "../util/merkle_hash_tree.hpp" #include "../ledger/ledger_query.hpp" +#include "../usr/user_common.hpp" namespace msg::usrmsg { - // Forward declaration - class usrmsg_parser; - class usrmsg_parser { const util::PROTOCOL protocol; @@ -33,7 +31,11 @@ namespace msg::usrmsg const util::merkle_hash_node &hash_root, const std::vector> &unl_sig, const uint64_t lcl_seq_no, std::string_view lcl_hash) const; - void create_unl_list_container(std::vector &msg, const ::std::set &unl_list) const; + void create_unl_notification(std::vector &msg, const std::set &unl_list) const; + + void create_ledger_created_notification(std::vector &msg, const ledger::ledger_record &ledger) const; + + void create_sync_status_notification(std::vector &msg, const bool in_sync) const; void create_ledger_query_response(std::vector &msg, std::string_view reply_for, const ledger::query::query_result &result) const; @@ -49,6 +51,8 @@ namespace msg::usrmsg int extract_input_container(std::string &input, uint64_t &nonce, uint64_t &max_ledger_seq_no, std::string_view encoded_content) const; + int extract_subscription_request(usr::NOTIFICATION_CHANNEL &channel, bool &enabled); + int extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id) const; }; diff --git a/src/status.cpp b/src/status.cpp index 77bc0d3f..a598ddb9 100644 --- a/src/status.cpp +++ b/src/status.cpp @@ -10,7 +10,8 @@ namespace status std::shared_mutex ledger_mutex; util::sequence_hash lcl_id; // Last ledger id/hash pair. ledger::ledger_record last_ledger; // Last ledger record that the node created. - bool is_in_sync = false; // Indicates whether this node is in sync with other nodes or not. + + std::atomic in_sync = false; // Indicates whether this node is in sync with other nodes or not. std::shared_mutex unl_mutex; std::set unl; // List of last reported unl binary pubkeys. @@ -25,7 +26,9 @@ namespace status // Not acquiring the mutex lock since this is called during startup only. lcl_id = ledger_id; last_ledger = ledger; - is_in_sync = true; + + // We assume we are not in sync unless otherwise found that we are. + in_sync = false; } void ledger_created(const util::sequence_hash &ledger_id, const ledger::ledger_record &ledger) @@ -33,13 +36,16 @@ namespace status std::unique_lock lock(ledger_mutex); lcl_id = ledger_id; last_ledger = ledger; - is_in_sync = true; // Creating a ledger automatically means we are in sync. + event_queue.try_enqueue(ledger_created_event{ledger}); } - void sync_status_changed(const bool in_sync) + void sync_status_changed(const bool new_in_sync) { - std::unique_lock lock(ledger_mutex); - is_in_sync = in_sync; + if (in_sync != new_in_sync) + { + in_sync = new_in_sync; + event_queue.try_enqueue(sync_status_change_event{new_in_sync}); + } } const util::sequence_hash get_lcl_id() @@ -48,6 +54,11 @@ namespace status return lcl_id; } + const bool is_in_sync() + { + return in_sync; + } + //----- UNL status void init_unl(const std::set &init_unl) diff --git a/src/status.hpp b/src/status.hpp index ebbf656b..e245da44 100644 --- a/src/status.hpp +++ b/src/status.hpp @@ -13,8 +13,18 @@ namespace status std::set unl; }; + struct ledger_created_event + { + ledger::ledger_record ledger; + }; + + struct sync_status_change_event + { + bool in_sync = false; + }; + // Represents any kind of change that has happened in the node. - typedef std::variant change_event; + typedef std::variant change_event; extern moodycamel::ConcurrentQueue event_queue; @@ -22,6 +32,7 @@ namespace status void ledger_created(const util::sequence_hash &ledger_id, const ledger::ledger_record &ledger); void sync_status_changed(const bool in_sync); const util::sequence_hash get_lcl_id(); + const bool is_in_sync(); void init_unl(const std::set &init_unl); void unl_changed(const std::set &new_unl); diff --git a/src/usr/user_common.hpp b/src/usr/user_common.hpp new file mode 100644 index 00000000..f1cad212 --- /dev/null +++ b/src/usr/user_common.hpp @@ -0,0 +1,15 @@ +#ifndef _HP_USR_USER_COMMON_ +#define _HP_USR_USER_COMMON_ + +namespace usr +{ + // List of notification channels users can subscribe to. + enum NOTIFICATION_CHANNEL + { + UNL_CHANGE = 0, + LEDGER_EVENT = 1 + }; + +} // namespace usr + +#endif \ No newline at end of file diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 0724090d..80b6876b 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -246,6 +246,16 @@ namespace usr user.session.send(resp); return 0; } + else if (msg_type == msg::usrmsg::MSGTYPE_SUBSCRIPTION) + { + NOTIFICATION_CHANNEL channel; + bool enabled; + if (parser.extract_subscription_request(channel, enabled) == -1) + return -1; + + user.subscriptions[channel] = enabled; + return 0; + } else if (msg_type == msg::usrmsg::MSGTYPE_LEDGER_QUERY) { ledger::query::query_request req; @@ -547,25 +557,50 @@ namespace usr // Array to hold constructed message cache from each protocol. std::vector protocol_msgs[2]; - if (ev.index() == 0) // UNL change event. Broadcast for all users. + if (ev.index() == 0) // UNL change event. Broadcast for subscribed users. { - const status::unl_change_event &unl_ev = std::get(ev); - std::scoped_lock lock(ctx.users_mutex); for (auto &[sid, user] : ctx.users) { - std::vector &msg = protocol_msgs[user.protocol]; - if (msg.empty()) // Construct the message with relevant protocol if not done so already. + if (user.subscriptions[NOTIFICATION_CHANNEL::UNL_CHANGE]) { - msg::usrmsg::usrmsg_parser parser(user.protocol); - parser.create_unl_list_container(msg, unl_ev.unl); + std::vector &msg = protocol_msgs[user.protocol]; + if (msg.empty()) // Construct the message with relevant protocol if not done so already. + { + msg::usrmsg::usrmsg_parser parser(user.protocol); + const status::unl_change_event &unl_ev = std::get(ev); + parser.create_unl_notification(msg, unl_ev.unl); + } + user.session.send(msg); } - user.session.send(msg); } + } + else if (ev.index() == 1 || ev.index() == 2) // Ledger events. Broadcast for subscribed users. + { + std::scoped_lock lock(ctx.users_mutex); + for (auto &[sid, user] : ctx.users) + { + if (user.subscriptions[NOTIFICATION_CHANNEL::LEDGER_EVENT]) + { + std::vector &msg = protocol_msgs[user.protocol]; + if (msg.empty()) // Construct the message with relevant protocol if not done so already. + { + msg::usrmsg::usrmsg_parser parser(user.protocol); - // Clear the caches for the next event. - protocol_msgs[util::PROTOCOL::JSON].clear(); - protocol_msgs[util::PROTOCOL::BSON].clear(); + if (ev.index() == 1) // Ledger created event. + { + const status::ledger_created_event &ledger_ev = std::get(ev); + parser.create_ledger_created_notification(msg, ledger_ev.ledger); + } + else if (ev.index() == 2) // Sync status chnge event. + { + const status::sync_status_change_event &sync_ev = std::get(ev); + parser.create_sync_status_notification(msg, sync_ev.in_sync); + } + } + user.session.send(msg); + } + } } } } diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index 141af417..faeee854 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -11,6 +11,7 @@ #include "user_comm_server.hpp" #include "user_session_handler.hpp" #include "user_input.hpp" +#include "user_common.hpp" /** * Maintains the global user list with pending input outputs and manages user connections. @@ -34,6 +35,9 @@ namespace usr // Total input bytes collected which are pending to be subjected to consensus. size_t collected_input_size = 0; + // User's notification subscription toggles. + bool subscriptions[2]; + // Holds the websocket session of this user. // We don't need to own the session object since the lifetime of user and session are coupled. usr::user_comm_session &session; @@ -48,6 +52,9 @@ namespace usr connected_user(usr::user_comm_session &session, std::string_view pubkey, util::PROTOCOL protocol) : session(session), pubkey(pubkey), protocol(protocol) { + // Default subscriptions. + subscriptions[NOTIFICATION_CHANNEL::UNL_CHANGE] = false; + subscriptions[NOTIFICATION_CHANNEL::LEDGER_EVENT] = false; } };