Status tracking infrastructure. (#317)

* Added node's current status information tracker.
* Used the status tracker for responding to status messages.
* Used change-event notifications to broadcast UNL change event.
This commit is contained in:
Ravin Perera
2021-06-04 15:08:10 +05:30
committed by GitHub
parent 07962bc3d5
commit 33bd63ac64
34 changed files with 414 additions and 211 deletions

View File

@@ -3,12 +3,14 @@
#include "../msg/usrmsg_parser.hpp"
#include "../msg/usrmsg_common.hpp"
#include "../util/util.hpp"
#include "../util/sequence_hash.hpp"
#include "../conf.hpp"
#include "../crypto.hpp"
#include "../hplog.hpp"
#include "../ledger/ledger.hpp"
#include "../util/buffer_store.hpp"
#include "../hpfs/hpfs_mount.hpp"
#include "../status.hpp"
#include "usr.hpp"
#include "user_session_handler.hpp"
#include "user_comm_session.hpp"
@@ -174,7 +176,7 @@ namespace usr
uint64_t max_ledger_seq_no;
if (parser.extract_input_container(input_data, nonce, max_ledger_seq_no, input_container) != -1)
{
const p2p::sequence_hash lcl_id = ledger::ctx.get_lcl_id();
const util::sequence_hash lcl_id = ledger::ctx.get_lcl_id();
// Ignore the input if the max ledger seq number specified is beyond the max offeset.
if (conf::cfg.contract.max_input_ledger_offset != 0 && max_ledger_seq_no > lcl_id.seq_no + conf::cfg.contract.max_input_ledger_offset)
{
@@ -233,8 +235,7 @@ namespace usr
else if (msg_type == msg::usrmsg::MSGTYPE_STAT)
{
std::vector<uint8_t> resp;
const p2p::sequence_hash lcl_id = ledger::ctx.get_lcl_id();
parser.create_status_response(resp, lcl_id.seq_no, lcl_id.hash.to_string_view());
parser.create_status_response(resp);
user.session.send(resp);
return 0;
}
@@ -529,22 +530,36 @@ namespace usr
}
/**
* Send unl list to all the connected users.
* @param unl_list Set of unl pubkeys.
*/
void announce_unl_list(const std::set<std::string> &unl_list)
* Sends any change event notifications to relevant users who are currently connected to the node.
*/
void dispatch_change_events()
{
std::scoped_lock<std::mutex> lock(ctx.users_mutex);
for (const auto &user : ctx.users)
status::change_event ev;
while (status::event_queue.try_dequeue(ev))
{
const usr::connected_user &connected_user = user.second;
msg::usrmsg::usrmsg_parser parser(connected_user.protocol);
// Array to hold constructed message cache from each protocol.
std::vector<uint8_t> protocol_msgs[2];
std::vector<uint8_t> msg;
parser.create_unl_list_container(msg, unl_list);
if (ev.index() == 0) // UNL change event. Broadcast for all users.
{
const status::unl_change_event &unl_ev = std::get<status::unl_change_event>(ev);
connected_user.session.send(msg);
std::scoped_lock<std::mutex> lock(ctx.users_mutex);
for (auto &[sid, user] : ctx.users)
{
std::vector<uint8_t> &msg = protocol_msgs[user.protocol];
if (msg.empty()) // Construct the message with relevant protocol if not done so already.
{
msg::usrmsg::usrmsg_parser parser(user.protocol);
parser.create_unl_list_container(msg, unl_ev.unl);
}
user.session.send(msg);
}
// Clear the caches for the next event.
protocol_msgs[util::PROTOCOL::JSON].clear();
protocol_msgs[util::PROTOCOL::BSON].clear();
}
}
}