Event subscription infrastructure and ledger events. (#319)

- Introduced event subscription infrastructure for users.
- Added ability to subscribe to ledger events.
- Updated client lib for event subscription support.
This commit is contained in:
Ravin Perera
2021-06-07 10:24:58 +05:30
committed by GitHub
parent a7b7f6c9de
commit cc11ebd7b3
16 changed files with 599 additions and 152 deletions

View File

@@ -52,11 +52,6 @@
console.log(server + " " + action);
})
// This will get when the unl public key list changes.
hpc.on(HotPocket.events.unlChange, (unl) => {
console.log("New unl received: " + JSON.stringify(unl)); // unl is an array of hex public keys.
})
// This will get fired when contract sends an output.
hpc.on(HotPocket.events.contractOutput, (r) => {
r.outputs.forEach(o => console.log(`Output (ledger:${r.ledgerSeqNo})>> ${o}`));
@@ -70,6 +65,17 @@
console.log("Read response>> " + response);
})
// This will get fired when the unl public key list changes.
hpc.on(HotPocket.events.unlChange, (unl) => {
console.log("New unl received:");
console.log(unl); // unl is an array of public keys.
})
// This will get fired when any ledger event occurs (ledger created, sync status change).
hpc.on(HotPocket.events.ledgerEvent, (ev) => {
console.log(ev);
})
// Establish HotPocket connection.
if (!await hpc.connect()) {
console.log('Connection failed.');
@@ -77,6 +83,10 @@
}
console.log('HotPocket Connected.');
// After connecting, we can subscribe to events from the HotPocket node.
// await hpc.subscribe(HotPocket.notificationChannels.unlChange);
// await hpc.subscribe(HotPocket.notificationChannels.ledgerEvent);
hpc.sendContractReadRequest("Hello");
const input = await hpc.submitContractInput("World!")

View File

@@ -47,13 +47,21 @@
/*--- Included in public interface. ---*/
const events = {
disconnect: "disconnect",
contractOutput: "contractOutput",
contractReadResponse: "contractReadResponse",
connectionChange: "connectionChange",
unlChange: "unlChange"
contractOutput: "contract_output",
contractReadResponse: "contract_read_response",
connectionChange: "connection_change",
unlChange: "unl_change",
ledgerEvent: "ledger_event"
}
Object.freeze(events);
/*--- Included in public interface. ---*/
const notificationChannels = {
unlChange: "unl_change",
ledgerEvent: "ledger_event"
}
Object.freeze(notificationChannels);
/*--- Included in public interface. ---*/
// privateKeyHex: Hex private key with prefix ('ed').
// Returns 'ed' (237) prefixed binary public/private keys.
@@ -143,6 +151,8 @@
if (key.length > 0)
trustedKeysLookup[key] = true
});
// If there are no keys specified, mark the lookup as null, indicating that we are not intersted in
// checking for trusted servers.
if (Object.keys(trustedKeysLookup).length == 0)
trustedKeysLookup = null;
@@ -153,15 +163,9 @@
let emitter = new EventEmitter();
// The accessor function passed into connections to query latest trusted key list.
// We update the returning key list whenever we get a unl update.
// The accessor functions passed into connections to query and set latest trusted key list.
const getTrustedKeys = () => trustedKeysLookup;
// Whenever unl change is reported, update the trusted key list.
emitter.on(events.unlChange, (unl) => {
trustedKeysLookup = {};
unl.sort().forEach(pubkey => trustedKeysLookup[pubkey] = true);
})
const setTrustedKeys = (newKeys) => (trustedKeysLookup = newKeys);
const nodes = Object.keys(serversLookup).map(s => {
return {
@@ -171,6 +175,12 @@
}
});
// Default subscriptions.
const subscriptions = {};
// Subscribe for unl changes if we have to maintain the trusted server key checks.
subscriptions[notificationChannels.unlChange] = trustedKeysLookup ? true : false;
subscriptions[notificationChannels.ledgerEvent] = false;
let status = 0; //0:none, 1:connected, 2:closed
// This will get fired whenever the required connection count gets fullfilled.
@@ -241,14 +251,24 @@
// Get the next available node.
const n = freeNodes.shift();
n.connection = new HotPocketConnection(contractId, contractVersion, clientKeys, n.server, getTrustedKeys, protocol, connectionTimeoutMs, emitter);
n.connection = new HotPocketConnection(
contractId, contractVersion, clientKeys, n.server,
getTrustedKeys, setTrustedKeys, protocol, connectionTimeoutMs, emitter);
n.lastActivity = new Date().getTime();
n.connection.connect().then(success => {
if (!success)
if (!success) {
n.connection = null;
else
}
else {
emitter && emitter.emit(events.connectionChange, n.server, "add");
// Issue subscription request for any subscriptions we have to maintain.
// We don't wait for completion because we just need to issue the request to the server.
for (const [channel, enabled] of Object.entries(subscriptions)) {
if (enabled) n.connection.subscribe(channel)
}
}
});
n.connection.onClose = () => {
@@ -360,12 +380,24 @@
return getMultiConnectionResult(con => con.getLcl());
}
this.subscribe = (channel) => {
subscriptions[channel] = true;
return executeMultiConnectionFunc(con => con.subscribe(channel));
}
this.unsubscribe = (channel) => {
subscriptions[channel] = false;
return executeMultiConnectionFunc(con => con.unsubscribe(channel));
}
this.getLedgerBySeqNo = (seqNo, includeInputs, includeOutputs) => {
return getMultiConnectionResult(con => con.getLedgerBySeqNo(seqNo, includeInputs, includeOutputs));
}
}
function HotPocketConnection(contractId, contractVersion, clientKeys, server, getTrustedKeys, protocol, connectionTimeoutMs, emitter) {
function HotPocketConnection(
contractId, contractVersion, clientKeys, server,
getTrustedKeys, setTrustedKeys, protocol, connectionTimeoutMs, emitter) {
// Create message helper with JSON protocol initially.
// After challenge handshake, we will change this to use the protocol specified by user.
@@ -491,18 +523,24 @@
return false;
}
const validateAndEmitUnlChange = (changedUnl) => {
// If this is currently a trusted connection, notify unl update.
const trustedKeys = getTrustedKeys();
if (trustedKeys && trustedKeys[pubkey]) {
// Prepare sorted new unl lookup object for equality comparison.
const newUnl = {};
changedUnl.sort().forEach(k => newUnl[k] = true);
const processUnlUpdate = (unl, isHandshake) => {
// Only emit unl change event if the unl has really changed.
if (JSON.stringify(trustedKeys) != JSON.stringify(newUnl))
emitter && emitter.emit(events.unlChange, changedUnl);
unl = unl.map(k => msgHelper.deserializeValue(k)).sort();
// If this is currently a trusted connection, update the trusted key set with the received unl.
let trustedKeys = getTrustedKeys();
if (trustedKeys && trustedKeys[pubkey]) {
trustedKeys = {}; // reset the object and reinitialize the list.
// Convert unl pubkeys to hex string so we can use them as lookup object keys.
const hexUnl = unl.map(k => msgHelper.stringifyValue(k));
hexUnl.forEach(k => trustedKeys[k] = true);
setTrustedKeys(trustedKeys);
liblog(0, "Updated trusted keys.");
}
if (!isHandshake)
emitter && emitter.emit(events.unlChange, unl);
}
const handshakeMessageHandler = (m) => {
@@ -556,17 +594,17 @@
clearTimeout(handshakeTimer); // Cancel the handshake timeout monitor.
handshakeTimer = null;
serverChallenge = null; // Clear the sent challenge as we no longer need it.
msgHelper.useProtocol(protocol); // Here onwards, use the message protocol specified by user.
pubkey = m.pubkey; // Set this connection's public key.
connectionStatus = 2; // Handshake complete.
processUnlUpdate(m.unl, true);
msgHelper.useProtocol(protocol); // Here onwards, use the message protocol specified by user.
// If we are still connected, report handshaking as successful.
// (If websocket disconnects, handshakeResolver will be already null)
handshakeResolver && handshakeResolver(true);
liblog(0, `Connected to ${server}`);
validateAndEmitUnlChange(m.unl);
return true;
}
@@ -621,6 +659,7 @@
hpVersion: m.hp_version,
ledgerSeqNo: m.ledger_seq_no,
ledgerHash: msgHelper.deserializeValue(m.ledger_hash),
inSync: m.in_sync,
roundTime: m.round_time,
contractExecutionEnabled: m.contract_execution_enabled,
readRequestsEnabled: m.read_requests_enabled,
@@ -641,27 +680,21 @@
lclResponseResolvers = [];
}
else if (m.type == "unl_change") {
if (m.unl) {
// Convert unl pubkeys to hex string.
let unl = m.unl.map(k => msgHelper.stringifyValue(k));
validateAndEmitUnlChange(unl);
}
processUnlUpdate(m.unl, false);
}
else if (m.type == "ledger_event") {
const ev = { event: m.event };
if (ev.event == "ledger_created")
ev.ledger = msgHelper.deserializeLedger(m.ledger);
else if (ev.event == "sync_status")
ev.inSync = m.in_sync;
emitter.emit(events.ledgerEvent, ev);
}
else if (m.type == "ledger_query_result") {
const resolver = ledgerQueryResolvers[m.reply_for];
if (resolver) {
const results = m.results.map(r => {
const result = {
seqNo: r.seq_no,
timestamp: r.timestamp,
hash: msgHelper.deserializeValue(r.hash),
prevHash: msgHelper.deserializeValue(r.prev_hash),
stateHash: msgHelper.deserializeValue(r.state_hash),
configHash: msgHelper.deserializeValue(r.config_hash),
userHash: msgHelper.deserializeValue(r.user_hash),
inputHash: msgHelper.deserializeValue(r.input_hash),
outputHash: msgHelper.deserializeValue(r.output_hash)
}
const result = msgHelper.deserializeLedger(r);
if (r.inputs) {
result.inputs = r.inputs.map(i => {
@@ -912,6 +945,24 @@
return Promise.resolve();
}
this.subscribe = (channel) => {
if (connectionStatus != 2)
return Promise.resolve();
const msg = msgHelper.createSubscriptionRequest(channel, true);
wsSend(msgHelper.serializeObject(msg));
return Promise.resolve();
}
this.unsubscribe = (channel) => {
if (connectionStatus != 2)
return Promise.resolve();
const msg = msgHelper.createSubscriptionRequest(channel, false);
wsSend(msgHelper.serializeObject(msg));
return Promise.resolve();
}
this.getLedgerBySeqNo = (seqNo, includeInputs, includeOutputs) => {
if (connectionStatus != 2)
return Promise.resolve(null);
@@ -1054,6 +1105,14 @@
return { type: "lcl" };
}
this.createSubscriptionRequest = (channel, enabled) => {
return {
type: "subscription",
channel: channel,
enabled: enabled
}
}
this.createLedgerQuery = (filterBy, params, includeInputs, includeOutputs) => {
const includes = [];
@@ -1068,6 +1127,20 @@
include: includes
}
}
this.deserializeLedger = (l) => {
return {
seqNo: l.seq_no,
timestamp: l.timestamp,
hash: this.deserializeValue(l.hash),
prevHash: this.deserializeValue(l.prev_hash),
stateHash: this.deserializeValue(l.state_hash),
configHash: this.deserializeValue(l.config_hash),
userHash: this.deserializeValue(l.user_hash),
inputHash: this.deserializeValue(l.input_hash),
outputHash: this.deserializeValue(l.output_hash)
}
}
}
function hexToUint8Array(hexString) {
@@ -1245,6 +1318,7 @@
generateKeys,
createClient,
events,
notificationChannels,
protocols,
setLogLevel
};
@@ -1254,6 +1328,7 @@
generateKeys,
createClient,
events,
notificationChannels,
protocols,
setLogLevel
};

View File

@@ -52,11 +52,6 @@ async function main() {
console.log(server + " " + action);
})
// This will get when the unl public key list changes.
hpc.on(HotPocket.events.unlChange, (unl) => {
console.log("New unl received: " + JSON.stringify(unl)); // unl is an array of hex public keys.
})
// This will get fired when contract sends outputs.
hpc.on(HotPocket.events.contractOutput, (r) => {
r.outputs.forEach(o => console.log(`Output (ledger:${r.ledgerSeqNo})>> ${o}`));
@@ -67,6 +62,17 @@ async function main() {
console.log("Read response>> " + response);
})
// This will get fired when the unl public key list changes.
hpc.on(HotPocket.events.unlChange, (unl) => {
console.log("New unl received:");
console.log(unl); // unl is an array of public keys.
})
// This will get fired when any ledger event occurs (ledger created, sync status change).
hpc.on(HotPocket.events.ledgerEvent, (ev) => {
console.log(ev);
})
// Establish HotPocket connection.
if (!await hpc.connect()) {
console.log('Connection failed.');
@@ -74,6 +80,10 @@ async function main() {
}
console.log('HotPocket Connected.');
// After connecting, we can subscribe to events from the HotPocket node.
// await hpc.subscribe(HotPocket.notificationChannels.unlChange);
// await hpc.subscribe(HotPocket.notificationChannels.ledgerEvent);
// start listening for stdin
const rl = readline.createInterface({
input: process.stdin,

View File

@@ -157,9 +157,10 @@ namespace consensus
new_sync_status = check_sync_status(unl_count, votes, lcl_id);
}
// Update the status if the sync status changed.
if ((ctx.sync_status != 0 && new_sync_status == 0) || (ctx.sync_status == 0 && new_sync_status != 0))
status::sync_status_changed(new_sync_status == 0);
// Update the sync status if we went from in-sync to not-in-sync. We will report back as being in-sync
// only when we hit stage 3.
if (ctx.sync_status == 0 && new_sync_status != 0)
status::sync_status_changed(false);
ctx.sync_status = new_sync_status;
}
@@ -187,6 +188,8 @@ namespace consensus
// Upon successful consensus at stage 3, update the ledger and execute the contract using the consensus proposal.
if (ctx.stage == 3)
{
status::sync_status_changed(true); // Creating a new ledger means we are in sync.
consensed_user_map consensed_users;
if (prepare_consensed_users(consensed_users, p) == -1 ||
commit_consensus_results(p, consensed_users, patch_hash) == -1)

View File

@@ -32,6 +32,7 @@ namespace msg::usrmsg::bson
{
const util::sequence_hash lcl_id = status::get_lcl_id();
const std::set<std::string> unl = status::get_unl();
const bool in_sync = status::is_in_sync();
jsoncons::bson::bson_bytes_encoder encoder(msg);
encoder.begin_object();
@@ -43,6 +44,8 @@ namespace msg::usrmsg::bson
encoder.int64_value(lcl_id.seq_no);
encoder.key(msg::usrmsg::FLD_LEDGER_HASH);
encoder.byte_string_value(lcl_id.hash.to_string_view());
encoder.key(msg::usrmsg::FLD_IN_SYNC);
encoder.bool_value(in_sync);
encoder.key(msg::usrmsg::FLD_ROUND_TIME);
encoder.uint64_value(conf::cfg.contract.roundtime);
encoder.key(msg::usrmsg::FLD_CONTARCT_EXECUTION_ENABLED);
@@ -236,7 +239,7 @@ namespace msg::usrmsg::bson
}
/**
* Constructs unl list container message.
* Constructs unl change notification message.
* @param msg Buffer to construct the generated bson message string into.
* Message format:
* {
@@ -245,7 +248,7 @@ namespace msg::usrmsg::bson
* }
* @param unl_list The unl node pubkey list to be put in the message.
*/
void create_unl_list_container(std::vector<uint8_t> &msg, const ::std::set<std::string> &unl_list)
void create_unl_notification(std::vector<uint8_t> &msg, const ::std::set<std::string> &unl_list)
{
jsoncons::bson::bson_bytes_encoder encoder(msg);
encoder.begin_object();
@@ -260,9 +263,61 @@ namespace msg::usrmsg::bson
encoder.flush();
}
/**
* Constructs ledger created notification message.
* @param msg Buffer to construct the generated bson message string into.
* Message format:
* {
* "type": "ledger_event",
* "event": "ledger_created",
* "ledger": { ... }
* }
* @param ledger The created ledger.
*/
void create_ledger_created_notification(std::vector<uint8_t> &msg, const ledger::ledger_record &ledger)
{
jsoncons::bson::bson_bytes_encoder encoder(msg);
encoder.begin_object();
encoder.key(msg::usrmsg::FLD_TYPE);
encoder.string_value(msg::usrmsg::MSGTYPE_LEDGER_EVENT);
encoder.key(msg::usrmsg::FLD_EVENT);
encoder.string_value(msg::usrmsg::LEDGER_EVENT_LEDGER_CREATED);
encoder.key(msg::usrmsg::FLD_LEDGER);
encoder.begin_object();
populate_ledger_fields(encoder, ledger);
encoder.end_object();
encoder.end_object();
encoder.flush();
}
/**
* Constructs sync status notification message.
* @param msg Buffer to construct the generated bson message string into.
* Message format:
* {
* "type": "ledger_event",
* "event": "sync_status",
* "in_sync": true | false
* }
* @param in_sync Whether the node is in sync or not.
*/
void create_sync_status_notification(std::vector<uint8_t> &msg, const bool in_sync)
{
jsoncons::bson::bson_bytes_encoder encoder(msg);
encoder.begin_object();
encoder.key(msg::usrmsg::FLD_TYPE);
encoder.string_value(msg::usrmsg::MSGTYPE_LEDGER_EVENT);
encoder.key(msg::usrmsg::FLD_EVENT);
encoder.string_value(msg::usrmsg::LEDGER_EVENT_SYNC_STATUS);
encoder.key(msg::usrmsg::FLD_IN_SYNC);
encoder.bool_value(in_sync);
encoder.end_object();
encoder.flush();
}
/**
* Constructs a ledger query response.
* @param msg Buffer to construct the generated json message string into.
* @param msg Buffer to construct the generated bson message string into.
* Message format:
* {
* "type": "ledger_query_result",
@@ -448,6 +503,46 @@ namespace msg::usrmsg::bson
return 0;
}
/**
* Extract ledger event subscription request.
* @param channel Extracted subscription channel.
* @param enabled Whether the subscription is enabled or not.
* @param d The json document holding the subscription request.
* Accepted message format:
* {
* "type": "subscription",
* "channel": "unl_change" | "ledger_event",
* "enabled": true | false
* }
* @return 0 on successful extraction. -1 for failure.
*/
int extract_subscription_request(usr::NOTIFICATION_CHANNEL &channel, bool &enabled, const jsoncons::ojson &d)
{
if (!d.contains(msg::usrmsg::FLD_CHANNEL) || !d.contains(msg::usrmsg::FLD_ENABLED) ||
!d[msg::usrmsg::FLD_CHANNEL].is<std::string>() || !d[msg::usrmsg::FLD_ENABLED].is<bool>())
{
LOG_DEBUG << "User subscription request required fields missing or invalid.";
return -1;
}
if (d[msg::usrmsg::FLD_CHANNEL] == msg::usrmsg::MSGTYPE_LEDGER_EVENT)
{
channel = usr::NOTIFICATION_CHANNEL::LEDGER_EVENT;
}
else if (d[msg::usrmsg::FLD_CHANNEL] == msg::usrmsg::MSGTYPE_UNL_CHANGE)
{
channel = usr::NOTIFICATION_CHANNEL::UNL_CHANGE;
}
else
{
LOG_DEBUG << "User subscription request invalid channel.";
return -1;
}
enabled = d[msg::usrmsg::FLD_ENABLED].as<bool>();
return 0;
}
/**
* Extract query information from a ledger query request.
* @param extracted_query Extracted query criteria.
@@ -547,24 +642,7 @@ namespace msg::usrmsg::bson
for (const ledger::ledger_record &ledger : results)
{
encoder.begin_object();
encoder.key(msg::usrmsg::FLD_SEQ_NO);
encoder.uint64_value(ledger.seq_no);
encoder.key(msg::usrmsg::FLD_TIMESTAMP);
encoder.uint64_value(ledger.timestamp);
encoder.key(msg::usrmsg::FLD_HASH);
encoder.byte_string_value(ledger.ledger_hash);
encoder.key(msg::usrmsg::FLD_PREV_HASH);
encoder.byte_string_value(ledger.prev_ledger_hash);
encoder.key(msg::usrmsg::FLD_STATE_HASH);
encoder.byte_string_value(ledger.state_hash);
encoder.key(msg::usrmsg::FLD_CONFIG_HASH);
encoder.byte_string_value(ledger.config_hash);
encoder.key(msg::usrmsg::FLD_USER_HASH);
encoder.byte_string_value(ledger.user_hash);
encoder.key(msg::usrmsg::FLD_INPUT_HASH);
encoder.byte_string_value(ledger.input_hash);
encoder.key(msg::usrmsg::FLD_OUTPUT_HASH);
encoder.byte_string_value(ledger.output_hash);
populate_ledger_fields(encoder, ledger);
// If raw inputs or outputs is not requested, we don't include that field at all in the response.
// Otherwise the field will always contain an array (empty array if no data).
@@ -585,6 +663,28 @@ namespace msg::usrmsg::bson
}
}
void populate_ledger_fields(jsoncons::bson::bson_bytes_encoder &encoder, const ledger::ledger_record &ledger)
{
encoder.key(msg::usrmsg::FLD_SEQ_NO);
encoder.uint64_value(ledger.seq_no);
encoder.key(msg::usrmsg::FLD_TIMESTAMP);
encoder.uint64_value(ledger.timestamp);
encoder.key(msg::usrmsg::FLD_HASH);
encoder.byte_string_value(ledger.ledger_hash);
encoder.key(msg::usrmsg::FLD_PREV_HASH);
encoder.byte_string_value(ledger.prev_ledger_hash);
encoder.key(msg::usrmsg::FLD_STATE_HASH);
encoder.byte_string_value(ledger.state_hash);
encoder.key(msg::usrmsg::FLD_CONFIG_HASH);
encoder.byte_string_value(ledger.config_hash);
encoder.key(msg::usrmsg::FLD_USER_HASH);
encoder.byte_string_value(ledger.user_hash);
encoder.key(msg::usrmsg::FLD_INPUT_HASH);
encoder.byte_string_value(ledger.input_hash);
encoder.key(msg::usrmsg::FLD_OUTPUT_HASH);
encoder.byte_string_value(ledger.output_hash);
}
void populate_ledger_inputs(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector<ledger::ledger_user_input> &inputs)
{
encoder.begin_array();

View File

@@ -4,6 +4,7 @@
#include "../../pchheader.hpp"
#include "../../util/merkle_hash_tree.hpp"
#include "../../ledger/ledger_query.hpp"
#include "../../usr/user_common.hpp"
namespace msg::usrmsg::bson
{
@@ -21,7 +22,11 @@ namespace msg::usrmsg::bson
const util::merkle_hash_node &hash_root, const std::vector<std::pair<std::string, std::string>> &unl_sig,
const uint64_t lcl_seq_no, std::string_view lcl_hash);
void create_unl_list_container(std::vector<uint8_t> &msg, const ::std::set<std::string> &unl_list);
void create_unl_notification(std::vector<uint8_t> &msg, const ::std::set<std::string> &unl_list);
void create_ledger_created_notification(std::vector<uint8_t> &msg, const ledger::ledger_record &ledger);
void create_sync_status_notification(std::vector<uint8_t> &msg, const bool in_sync);
void create_ledger_query_response(std::vector<uint8_t> &msg, std::string_view reply_for,
const ledger::query::query_result &result);
@@ -41,12 +46,16 @@ namespace msg::usrmsg::bson
int extract_input_container(std::string &input, uint64_t &nonce,
uint64_t &max_ledger_seq_no, std::string_view contentbson);
int extract_subscription_request(usr::NOTIFICATION_CHANNEL &channel, bool &enabled, const jsoncons::ojson &d);
int extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id, const jsoncons::ojson &d);
void populate_output_hash_array(jsoncons::bson::bson_bytes_encoder &encoder, const util::merkle_hash_node &node);
void populate_ledger_query_results(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector<ledger::ledger_record> &results);
void populate_ledger_fields(jsoncons::bson::bson_bytes_encoder &encoder, const ledger::ledger_record &ledger);
void populate_ledger_inputs(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector<ledger::ledger_user_input> &inputs);
void populate_ledger_outputs(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector<ledger::ledger_user_output> &users);

View File

@@ -151,10 +151,9 @@ namespace msg::usrmsg::json
{
const util::sequence_hash lcl_id = status::get_lcl_id();
const std::set<std::string> unl = status::get_unl();
const bool in_sync = status::is_in_sync();
const uint16_t msg_length = 406 + (69 * unl.size());
msg.reserve(msg_length);
msg.reserve(1024);
msg += "{\"";
msg += msg::usrmsg::FLD_TYPE;
msg += SEP_COLON;
@@ -172,21 +171,25 @@ namespace msg::usrmsg::json
msg += SEP_COLON;
msg += util::to_hex(lcl_id.hash.to_string_view());
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_IN_SYNC;
msg += SEP_COLON_NOQUOTE;
msg += in_sync ? STR_TRUE : STR_FALSE;
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_ROUND_TIME;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(conf::cfg.contract.roundtime);
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_CONTARCT_EXECUTION_ENABLED;
msg += SEP_COLON_NOQUOTE;
msg += conf::cfg.contract.execute ? "true" : "false";
msg += conf::cfg.contract.execute ? STR_TRUE : STR_FALSE;
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_READ_REQUESTS_ENABLED;
msg += SEP_COLON_NOQUOTE;
msg += conf::cfg.user.concurrent_read_reqeuests != 0 ? "true" : "false";
msg += conf::cfg.user.concurrent_read_reqeuests != 0 ? STR_TRUE : STR_FALSE;
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_IS_FULL_HISTORY_NODE;
msg += SEP_COLON_NOQUOTE;
msg += conf::cfg.node.history == conf::HISTORY::FULL ? "true" : "false";
msg += conf::cfg.node.history == conf::HISTORY::FULL ? STR_TRUE : STR_FALSE;
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_CURRENT_UNL;
msg += SEP_COLON_NOQUOTE;
@@ -451,7 +454,7 @@ namespace msg::usrmsg::json
}
/**
* Constructs unl list container message.
* Constructs unl change notification message.
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
@@ -460,7 +463,7 @@ namespace msg::usrmsg::json
* }
* @param unl_list The unl node pubkey list to be put in the message.
*/
void create_unl_list_container(std::vector<uint8_t> &msg, const ::std::set<std::string> &unl_list)
void create_unl_notification(std::vector<uint8_t> &msg, const ::std::set<std::string> &unl_list)
{
msg.reserve((69 * unl_list.size()) + 30);
msg += "{\"";
@@ -485,6 +488,64 @@ namespace msg::usrmsg::json
msg += "]}";
}
/**
* Constructs ledger created notification message.
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
* "type": "ledger_event",
* "event": "ledger_created",
* "ledger": { ... }
* }
* @param ledger The created ledger.
*/
void create_ledger_created_notification(std::vector<uint8_t> &msg, const ledger::ledger_record &ledger)
{
msg.reserve(1024);
msg += "{\"";
msg += msg::usrmsg::FLD_TYPE;
msg += SEP_COLON;
msg += msg::usrmsg::MSGTYPE_LEDGER_EVENT;
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_EVENT;
msg += SEP_COLON;
msg += msg::usrmsg::LEDGER_EVENT_LEDGER_CREATED;
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_LEDGER;
msg += "\":{";
populate_ledger_fields(msg, ledger);
msg += "}}";
}
/**
* Constructs sync status notification message.
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
* "type": "ledger_event",
* "event": "sync_status",
* "in_sync": true | false
* }
* @param in_sync Whether the node is in sync or not.
*/
void create_sync_status_notification(std::vector<uint8_t> &msg, const bool in_sync)
{
msg.reserve(128);
msg += "{\"";
msg += msg::usrmsg::FLD_TYPE;
msg += SEP_COLON;
msg += msg::usrmsg::MSGTYPE_LEDGER_EVENT;
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_EVENT;
msg += SEP_COLON;
msg += msg::usrmsg::LEDGER_EVENT_SYNC_STATUS;
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_IN_SYNC;
msg += SEP_COLON_NOQUOTE;
msg += in_sync ? STR_TRUE : STR_FALSE;
msg += "}";
}
/**
* Constructs a ledger query response.
* @param msg Buffer to construct the generated json message string into.
@@ -796,6 +857,51 @@ namespace msg::usrmsg::json
return 0;
}
/**
* Extract ledger event subscription request.
* @param channel Extracted subscription channel.
* @param enabled Whether the subscription is enabled or not.
* @param d The json document holding the subscription request.
* Accepted message format:
* {
* "type": "subscription",
* "channel": "unl_change" | "ledger_event",
* "enabled": true | false
* }
* @return 0 on successful extraction. -1 for failure.
*/
int extract_subscription_request(usr::NOTIFICATION_CHANNEL &channel, bool &enabled, const jsoncons::json &d)
{
if (!d.contains(msg::usrmsg::FLD_CHANNEL) || !d.contains(msg::usrmsg::FLD_ENABLED))
{
LOG_DEBUG << "User subscription request required fields missing.";
return -1;
}
if (!d[msg::usrmsg::FLD_CHANNEL].is<std::string>() || !d[msg::usrmsg::FLD_ENABLED].is<bool>())
{
LOG_DEBUG << "User subscription request invalid field values.";
return -1;
}
if (d[msg::usrmsg::FLD_CHANNEL] == msg::usrmsg::MSGTYPE_LEDGER_EVENT)
{
channel = usr::NOTIFICATION_CHANNEL::LEDGER_EVENT;
}
else if (d[msg::usrmsg::FLD_CHANNEL] == msg::usrmsg::MSGTYPE_UNL_CHANGE)
{
channel = usr::NOTIFICATION_CHANNEL::UNL_CHANGE;
}
else
{
LOG_DEBUG << "User subscription request invalid channel.";
return -1;
}
enabled = d[msg::usrmsg::FLD_ENABLED].as<bool>();
return 0;
}
/**
* Extract query information from a ledger query request.
* @param extracted_query Extracted query criteria.
@@ -880,7 +986,7 @@ namespace msg::usrmsg::json
if ((first == '\"' && last == '\"') ||
(first == '{' && last == '}') ||
(first == '[' && last == ']') ||
content == "true" || content == "false")
content == STR_TRUE || content == STR_FALSE)
return false;
// Check whether all characters are digits.
@@ -933,43 +1039,8 @@ namespace msg::usrmsg::json
{
const ledger::ledger_record &ledger = results[i];
msg += "{\"";
msg += msg::usrmsg::FLD_SEQ_NO;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(ledger.seq_no);
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_TIMESTAMP;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(ledger.timestamp);
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_HASH;
msg += SEP_COLON;
msg += util::to_hex(ledger.ledger_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_PREV_HASH;
msg += SEP_COLON;
msg += util::to_hex(ledger.prev_ledger_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_STATE_HASH;
msg += SEP_COLON;
msg += util::to_hex(ledger.state_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_CONFIG_HASH;
msg += SEP_COLON;
msg += util::to_hex(ledger.config_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_USER_HASH;
msg += SEP_COLON;
msg += util::to_hex(ledger.user_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_INPUT_HASH;
msg += SEP_COLON;
msg += util::to_hex(ledger.input_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_OUTPUT_HASH;
msg += SEP_COLON;
msg += util::to_hex(ledger.output_hash);
msg += "\"";
msg += "{";
populate_ledger_fields(msg, ledger);
// If raw inputs or outputs is not requested, we don't include that field at all in the response.
// Otherwise the field will always contain an array (empty array if no data).
@@ -994,6 +1065,47 @@ namespace msg::usrmsg::json
}
}
void populate_ledger_fields(std::vector<uint8_t> &msg, const ledger::ledger_record &ledger)
{
msg += "\"";
msg += msg::usrmsg::FLD_SEQ_NO;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(ledger.seq_no);
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_TIMESTAMP;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(ledger.timestamp);
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_HASH;
msg += SEP_COLON;
msg += util::to_hex(ledger.ledger_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_PREV_HASH;
msg += SEP_COLON;
msg += util::to_hex(ledger.prev_ledger_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_STATE_HASH;
msg += SEP_COLON;
msg += util::to_hex(ledger.state_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_CONFIG_HASH;
msg += SEP_COLON;
msg += util::to_hex(ledger.config_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_USER_HASH;
msg += SEP_COLON;
msg += util::to_hex(ledger.user_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_INPUT_HASH;
msg += SEP_COLON;
msg += util::to_hex(ledger.input_hash);
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_OUTPUT_HASH;
msg += SEP_COLON;
msg += util::to_hex(ledger.output_hash);
msg += "\"";
}
void populate_ledger_inputs(std::vector<uint8_t> &msg, const std::vector<ledger::ledger_user_input> &inputs)
{
msg += "[";

View File

@@ -4,6 +4,7 @@
#include "../../pchheader.hpp"
#include "../../util/merkle_hash_tree.hpp"
#include "../../ledger/ledger_query.hpp"
#include "../../usr/user_common.hpp"
namespace msg::usrmsg::json
{
@@ -25,7 +26,11 @@ namespace msg::usrmsg::json
const util::merkle_hash_node &hash_root, const std::vector<std::pair<std::string, std::string>> &unl_sig,
const uint64_t lcl_seq_no, std::string_view lcl_hash);
void create_unl_list_container(std::vector<uint8_t> &msg, const ::std::set<std::string> &unl_list);
void create_unl_notification(std::vector<uint8_t> &msg, const ::std::set<std::string> &unl_list);
void create_ledger_created_notification(std::vector<uint8_t> &msg, const ledger::ledger_record &ledger);
void create_sync_status_notification(std::vector<uint8_t> &msg, const bool in_sync);
void create_ledger_query_response(std::vector<uint8_t> &msg, std::string_view reply_for,
const ledger::query::query_result &result);
@@ -45,6 +50,8 @@ namespace msg::usrmsg::json
int extract_input_container(std::string &input, uint64_t &nonce,
uint64_t &max_ledger_seq_no, std::string_view contentjson);
int extract_subscription_request(usr::NOTIFICATION_CHANNEL &channel, bool &enabled, const jsoncons::json &d);
int extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id, const jsoncons::json &d);
bool is_json_string(std::string_view content);
@@ -53,6 +60,8 @@ namespace msg::usrmsg::json
void populate_ledger_query_results(std::vector<uint8_t> &msg, const std::vector<ledger::ledger_record> &results);
void populate_ledger_fields(std::vector<uint8_t> &msg, const ledger::ledger_record &ledger);
void populate_ledger_inputs(std::vector<uint8_t> &msg, const std::vector<ledger::ledger_user_input> &inputs);
void populate_ledger_outputs(std::vector<uint8_t> &msg, const std::vector<ledger::ledger_user_output> &users);

View File

@@ -43,6 +43,7 @@ namespace msg::usrmsg
constexpr const char *FLD_IS_FULL_HISTORY_NODE = "is_full_history_node";
constexpr const char *FLD_CURRENT_UNL = "current_unl";
constexpr const char *FLD_PEERS = "peers";
constexpr const char *FLD_IN_SYNC = "in_sync";
constexpr const char *FLD_ID = "id";
constexpr const char *FLD_REPLY_FOR = "reply_for";
constexpr const char *FLD_FILTER_BY = "filter_by";
@@ -60,6 +61,10 @@ namespace msg::usrmsg
constexpr const char *FLD_INPUTS = "inputs";
constexpr const char *FLD_BLOB = "blob";
constexpr const char *FLD_BLOBS = "blobs";
constexpr const char *FLD_EVENT = "event";
constexpr const char *FLD_LEDGER = "ledger";
constexpr const char *FLD_CHANNEL = "channel";
constexpr const char *FLD_ENABLED = "enabled";
// Message types
constexpr const char *MSGTYPE_USER_CHALLENGE = "user_challenge";
@@ -75,8 +80,10 @@ namespace msg::usrmsg
constexpr const char *MSGTYPE_LCL = "lcl";
constexpr const char *MSGTYPE_LCL_RESPONSE = "lcl_response";
constexpr const char *MSGTYPE_UNL_CHANGE = "unl_change";
constexpr const char *MSGTYPE_LEDGER_EVENT = "ledger_event";
constexpr const char *MSGTYPE_LEDGER_QUERY = "ledger_query";
constexpr const char *MSGTYPE_LEDGER_QUERY_RESULT = "ledger_query_result";
constexpr const char *MSGTYPE_SUBSCRIPTION = "subscription";
constexpr const char *MSGTYPE_UNKNOWN = "unknown";
// Values
@@ -92,6 +99,11 @@ namespace msg::usrmsg
constexpr const char *REASON_ALREADY_SUBMITTED = "already_submitted";
constexpr const char *REASON_ROUND_INPUTS_OVERFLOW = "round_inputs_overflow";
constexpr const char *QUERY_FILTER_BY_SEQ_NO = "seq_no";
constexpr const char *STR_TRUE = "true";
constexpr const char *STR_FALSE = "false";
constexpr const char *LEDGER_EVENT_LEDGER_CREATED = "ledger_created";
constexpr const char *LEDGER_EVENT_SYNC_STATUS = "sync_status";
} // namespace msg::usrmsg

View File

@@ -56,12 +56,28 @@ namespace msg::usrmsg
busrmsg::create_contract_output_container(msg, hash, outputs, hash_root, unl_sig, lcl_seq_no, lcl_hash);
}
void usrmsg_parser::create_unl_list_container(std::vector<uint8_t> &msg, const ::std::set<std::string> &unl_list) const
void usrmsg_parser::create_unl_notification(std::vector<uint8_t> &msg, const std::set<std::string> &unl_list) const
{
if (protocol == util::PROTOCOL::JSON)
jusrmsg::create_unl_list_container(msg, unl_list);
jusrmsg::create_unl_notification(msg, unl_list);
else
busrmsg::create_unl_list_container(msg, unl_list);
busrmsg::create_unl_notification(msg, unl_list);
}
void usrmsg_parser::create_ledger_created_notification(std::vector<uint8_t> &msg, const ledger::ledger_record &ledger) const
{
if (protocol == util::PROTOCOL::JSON)
jusrmsg::create_ledger_created_notification(msg, ledger);
else
busrmsg::create_ledger_created_notification(msg, ledger);
}
void usrmsg_parser::create_sync_status_notification(std::vector<uint8_t> &msg, const bool in_sync) const
{
if (protocol == util::PROTOCOL::JSON)
jusrmsg::create_sync_status_notification(msg, in_sync);
else
busrmsg::create_sync_status_notification(msg, in_sync);
}
void usrmsg_parser::create_ledger_query_response(std::vector<uint8_t> &msg, std::string_view reply_for,
@@ -114,6 +130,14 @@ namespace msg::usrmsg
return busrmsg::extract_input_container(input, nonce, max_ledger_seq_no, encoded_content);
}
int usrmsg_parser::extract_subscription_request(usr::NOTIFICATION_CHANNEL &channel, bool &enabled)
{
if (protocol == util::PROTOCOL::JSON)
return jusrmsg::extract_subscription_request(channel, enabled, jdoc);
else
return busrmsg::extract_subscription_request(channel, enabled, bdoc);
}
int usrmsg_parser::extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id) const
{
if (protocol == util::PROTOCOL::JSON)

View File

@@ -5,12 +5,10 @@
#include "../util/util.hpp"
#include "../util/merkle_hash_tree.hpp"
#include "../ledger/ledger_query.hpp"
#include "../usr/user_common.hpp"
namespace msg::usrmsg
{
// Forward declaration
class usrmsg_parser;
class usrmsg_parser
{
const util::PROTOCOL protocol;
@@ -33,7 +31,11 @@ namespace msg::usrmsg
const util::merkle_hash_node &hash_root, const std::vector<std::pair<std::string, std::string>> &unl_sig,
const uint64_t lcl_seq_no, std::string_view lcl_hash) const;
void create_unl_list_container(std::vector<uint8_t> &msg, const ::std::set<std::string> &unl_list) const;
void create_unl_notification(std::vector<uint8_t> &msg, const std::set<std::string> &unl_list) const;
void create_ledger_created_notification(std::vector<uint8_t> &msg, const ledger::ledger_record &ledger) const;
void create_sync_status_notification(std::vector<uint8_t> &msg, const bool in_sync) const;
void create_ledger_query_response(std::vector<uint8_t> &msg, std::string_view reply_for,
const ledger::query::query_result &result) const;
@@ -49,6 +51,8 @@ namespace msg::usrmsg
int extract_input_container(std::string &input, uint64_t &nonce,
uint64_t &max_ledger_seq_no, std::string_view encoded_content) const;
int extract_subscription_request(usr::NOTIFICATION_CHANNEL &channel, bool &enabled);
int extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id) const;
};

View File

@@ -10,7 +10,8 @@ namespace status
std::shared_mutex ledger_mutex;
util::sequence_hash lcl_id; // Last ledger id/hash pair.
ledger::ledger_record last_ledger; // Last ledger record that the node created.
bool is_in_sync = false; // Indicates whether this node is in sync with other nodes or not.
std::atomic<bool> in_sync = false; // Indicates whether this node is in sync with other nodes or not.
std::shared_mutex unl_mutex;
std::set<std::string> unl; // List of last reported unl binary pubkeys.
@@ -25,7 +26,9 @@ namespace status
// Not acquiring the mutex lock since this is called during startup only.
lcl_id = ledger_id;
last_ledger = ledger;
is_in_sync = true;
// We assume we are not in sync unless otherwise found that we are.
in_sync = false;
}
void ledger_created(const util::sequence_hash &ledger_id, const ledger::ledger_record &ledger)
@@ -33,13 +36,16 @@ namespace status
std::unique_lock lock(ledger_mutex);
lcl_id = ledger_id;
last_ledger = ledger;
is_in_sync = true; // Creating a ledger automatically means we are in sync.
event_queue.try_enqueue(ledger_created_event{ledger});
}
void sync_status_changed(const bool in_sync)
void sync_status_changed(const bool new_in_sync)
{
std::unique_lock lock(ledger_mutex);
is_in_sync = in_sync;
if (in_sync != new_in_sync)
{
in_sync = new_in_sync;
event_queue.try_enqueue(sync_status_change_event{new_in_sync});
}
}
const util::sequence_hash get_lcl_id()
@@ -48,6 +54,11 @@ namespace status
return lcl_id;
}
const bool is_in_sync()
{
return in_sync;
}
//----- UNL status
void init_unl(const std::set<std::string> &init_unl)

View File

@@ -13,8 +13,18 @@ namespace status
std::set<std::string> unl;
};
struct ledger_created_event
{
ledger::ledger_record ledger;
};
struct sync_status_change_event
{
bool in_sync = false;
};
// Represents any kind of change that has happened in the node.
typedef std::variant<unl_change_event> change_event;
typedef std::variant<unl_change_event, ledger_created_event, sync_status_change_event> change_event;
extern moodycamel::ConcurrentQueue<change_event> event_queue;
@@ -22,6 +32,7 @@ namespace status
void ledger_created(const util::sequence_hash &ledger_id, const ledger::ledger_record &ledger);
void sync_status_changed(const bool in_sync);
const util::sequence_hash get_lcl_id();
const bool is_in_sync();
void init_unl(const std::set<std::string> &init_unl);
void unl_changed(const std::set<std::string> &new_unl);

15
src/usr/user_common.hpp Normal file
View File

@@ -0,0 +1,15 @@
#ifndef _HP_USR_USER_COMMON_
#define _HP_USR_USER_COMMON_
namespace usr
{
// List of notification channels users can subscribe to.
enum NOTIFICATION_CHANNEL
{
UNL_CHANGE = 0,
LEDGER_EVENT = 1
};
} // namespace usr
#endif

View File

@@ -246,6 +246,16 @@ namespace usr
user.session.send(resp);
return 0;
}
else if (msg_type == msg::usrmsg::MSGTYPE_SUBSCRIPTION)
{
NOTIFICATION_CHANNEL channel;
bool enabled;
if (parser.extract_subscription_request(channel, enabled) == -1)
return -1;
user.subscriptions[channel] = enabled;
return 0;
}
else if (msg_type == msg::usrmsg::MSGTYPE_LEDGER_QUERY)
{
ledger::query::query_request req;
@@ -547,25 +557,50 @@ namespace usr
// Array to hold constructed message cache from each protocol.
std::vector<uint8_t> protocol_msgs[2];
if (ev.index() == 0) // UNL change event. Broadcast for all users.
if (ev.index() == 0) // UNL change event. Broadcast for subscribed users.
{
const status::unl_change_event &unl_ev = std::get<status::unl_change_event>(ev);
std::scoped_lock<std::mutex> lock(ctx.users_mutex);
for (auto &[sid, user] : ctx.users)
{
std::vector<uint8_t> &msg = protocol_msgs[user.protocol];
if (msg.empty()) // Construct the message with relevant protocol if not done so already.
if (user.subscriptions[NOTIFICATION_CHANNEL::UNL_CHANGE])
{
msg::usrmsg::usrmsg_parser parser(user.protocol);
parser.create_unl_list_container(msg, unl_ev.unl);
std::vector<uint8_t> &msg = protocol_msgs[user.protocol];
if (msg.empty()) // Construct the message with relevant protocol if not done so already.
{
msg::usrmsg::usrmsg_parser parser(user.protocol);
const status::unl_change_event &unl_ev = std::get<status::unl_change_event>(ev);
parser.create_unl_notification(msg, unl_ev.unl);
}
user.session.send(msg);
}
user.session.send(msg);
}
}
else if (ev.index() == 1 || ev.index() == 2) // Ledger events. Broadcast for subscribed users.
{
std::scoped_lock<std::mutex> lock(ctx.users_mutex);
for (auto &[sid, user] : ctx.users)
{
if (user.subscriptions[NOTIFICATION_CHANNEL::LEDGER_EVENT])
{
std::vector<uint8_t> &msg = protocol_msgs[user.protocol];
if (msg.empty()) // Construct the message with relevant protocol if not done so already.
{
msg::usrmsg::usrmsg_parser parser(user.protocol);
// Clear the caches for the next event.
protocol_msgs[util::PROTOCOL::JSON].clear();
protocol_msgs[util::PROTOCOL::BSON].clear();
if (ev.index() == 1) // Ledger created event.
{
const status::ledger_created_event &ledger_ev = std::get<status::ledger_created_event>(ev);
parser.create_ledger_created_notification(msg, ledger_ev.ledger);
}
else if (ev.index() == 2) // Sync status chnge event.
{
const status::sync_status_change_event &sync_ev = std::get<status::sync_status_change_event>(ev);
parser.create_sync_status_notification(msg, sync_ev.in_sync);
}
}
user.session.send(msg);
}
}
}
}
}

View File

@@ -11,6 +11,7 @@
#include "user_comm_server.hpp"
#include "user_session_handler.hpp"
#include "user_input.hpp"
#include "user_common.hpp"
/**
* Maintains the global user list with pending input outputs and manages user connections.
@@ -34,6 +35,9 @@ namespace usr
// Total input bytes collected which are pending to be subjected to consensus.
size_t collected_input_size = 0;
// User's notification subscription toggles.
bool subscriptions[2];
// Holds the websocket session of this user.
// We don't need to own the session object since the lifetime of user and session are coupled.
usr::user_comm_session &session;
@@ -48,6 +52,9 @@ namespace usr
connected_user(usr::user_comm_session &session, std::string_view pubkey, util::PROTOCOL protocol)
: session(session), pubkey(pubkey), protocol(protocol)
{
// Default subscriptions.
subscriptions[NOTIFICATION_CHANNEL::UNL_CHANGE] = false;
subscriptions[NOTIFICATION_CHANNEL::LEDGER_EVENT] = false;
}
};