Restructured user message handling.

This commit is contained in:
Ravin Perera
2019-10-31 11:57:53 +05:30
parent 93d4abfd2a
commit a51ec4a030
10 changed files with 310 additions and 287 deletions

View File

@@ -1,11 +1,10 @@
#include <cstdio>
#include <iostream>
#include <unistd.h>
#include <rapidjson/document.h>
#include <sodium.h>
#include <boost/thread/thread.hpp>
#include "usr.hpp"
#include "user_session_handler.hpp"
#include "../jsonschema/usrmsg_helpers.hpp"
#include "../sock/socket_server.hpp"
#include "../sock/socket_session_handler.hpp"
#include "../util.hpp"
@@ -13,65 +12,16 @@
#include "../crypto.hpp"
#include "../hplog.hpp"
namespace jusrmsg = jsonschema::usrmsg;
namespace usr
{
// The SSL context is required, and holds certificates
ssl::context ctx{ssl::context::tlsv13};
// Holds global connected-users and related objects.
connected_context ctx;
/**
* Connected (authenticated) user list. (Exposed to other sub systems)
* Map key: User socket session id (<ip:port>)
*/
std::unordered_map<std::string, usr::connected_user> users;
std::mutex users_mutex; // Mutex for users access race conditions.
/**
* Holds set of connected user session ids and public keys for lookups.
* This is used for pubkey duplicate checks as well.
* Map key: User binary pubkey
*/
std::unordered_map<std::string, const std::string> sessionids;
/**
* Keep track of verification-pending challenges issued to newly connected users.
* Map key: User socket session id (<ip:port>)
*/
std::unordered_map<std::string, const std::string> pending_challenges;
/**
* User session handler instance. This instance's methods will be fired for any user socket activity.
*/
usr::user_session_handler global_usr_session_handler;
/**
* The IO context used by the websocket listener. (not exposed out of this namespace)
*/
net::io_context ioc;
/**
* The thread the websocket listener is running on. (not exposed out of this namespace)
*/
std::thread listener_thread;
/**
* Used to pass down the default settings to the socket session
*/
sock::session_options sess_opts;
// Challenge response fields.
// These fields are used on challenge response validation.
static const char *CHALLENGE_RESP_TYPE = "type";
static const char *CHALLENGE_RESP_CHALLENGE = "challenge";
static const char *CHALLENGE_RESP_SIG = "sig";
static const char *CHALLENGE_RESP_PUBKEY = "pubkey";
// Message type for the user challenge.
static const char *CHALLENGE_MSGTYPE = "public_challenge";
// Message type for the user challenge response.
static const char *CHALLENGE_RESP_MSGTYPE = "challenge_response";
// Length of user random challenge bytes.
static const size_t CHALLENGE_LEN = 16;
// Holds objects used by socket listener.
listener_context listener_ctx;
/**
* Initializes the usr subsystem. Must be called once during application startup.
@@ -81,129 +31,83 @@ int init()
{
// Start listening for incoming user connections.
start_listening();
return 0;
}
/**
* Free any resources used by usr subsystem (eg. socket listeners).
*/
void deinit()
std::string issue_challenge(const std::string sessionid)
{
stop_listening();
std::string msgstr;
std::string challengehex;
jusrmsg::create_user_challenge(msgstr, challengehex);
// Create an entry in pending_challenges for later tracking upon challenge response.
ctx.pending_challenges.try_emplace(std::move(sessionid), challengehex);
return msgstr;
}
/**
* Constructs user challenge message json and the challenge string required for
* initial user challenge handshake. This gets called when a user gets establishes
* a web sockets connection to HP.
*
* @param msg String reference to copy the generated json message string into.
* Message format:
* {
* "version": "<HP version>",
* "type": "public_challenge",
* "challenge": "<hex challenge string>"
* }
* @param challenge String reference to copy the generated hex challenge string into.
*/
void create_user_challenge(std::string &msg, std::string &challengehex)
bool verify_challenge(std::string_view message, sock::socket_session<user_outbound_message> *session)
{
//Use libsodium to generate the random challenge bytes.
unsigned char challenge_bytes[CHALLENGE_LEN];
randombytes_buf(challenge_bytes, CHALLENGE_LEN);
// The received message must be the challenge response. We need to verify it.
auto itr = ctx.pending_challenges.find(session->uniqueid);
if (itr == ctx.pending_challenges.end())
{
LOG_DBG << "No challenge found for the session " << session->uniqueid;
return false;
}
//We pass the hex challenge string separately to the caller even though
//we also include it in the challenge msg as well.
std::string userpubkeyhex;
std::string_view original_challenge = itr->second;
if (jusrmsg::verify_user_challenge_response(userpubkeyhex, message, original_challenge) == 0)
{
// Challenge singature verification successful.
util::bin2hex(challengehex, challenge_bytes, CHALLENGE_LEN);
// Decode hex pubkey and get binary pubkey. We are only going to keep
// the binary pubkey due to reduced memory footprint.
std::string userpubkey;
userpubkey.resize(userpubkeyhex.length() / 2);
util::hex2bin(
reinterpret_cast<unsigned char *>(userpubkey.data()),
userpubkey.length(),
userpubkeyhex);
//Construct the challenge msg json.
// We do not use RapidJson here in favour of performance because this is a simple json message.
// Now check whether this user public key is duplicate.
if (ctx.sessionids.count(userpubkey) == 0)
{
// All good. Unique public key.
// Promote the connection from pending-challenges to authenticated users.
// Since we know the rough size of the challenge massage we reserve adequate amount for the holder.
// Only Hot Pocket version number is variable length. Therefore message size is roughly 95 bytes
// so allocating 128bits for heap padding.
msg.reserve(128);
msg.append("{\"version\":\"")
.append(util::HP_VERSION)
.append("\",\"type\":\"public_challenge\",\"challenge\":\"")
.append(challengehex)
.append("\"}");
session->flags.reset(util::SESSION_FLAG::USER_CHALLENGE_ISSUED); // Clear challenge-issued flag
session->flags.set(util::SESSION_FLAG::USER_AUTHED); // Set the user-authed flag
add_user(session, userpubkey); // Add the user to the global authed user list
ctx.pending_challenges.erase(session->uniqueid); // Remove the stored challenge
LOG_INFO << "User connection " << session->uniqueid << " authenticated. Public key "
<< userpubkeyhex;
return true;
}
else
{
LOG_INFO << "Duplicate user public key " << session->uniqueid;
}
}
else
{
LOG_INFO << "Challenge verification failed " << session->uniqueid;
}
return false;
}
/**
* Verifies the user challenge response with the original challenge issued to the user
* and the user public key contained in the response.
*
* @param extracted_pubkeyhex The hex public key extracted from the response.
* @param response The response bytes to verify. This will be parsed as json.
* Accepted response format:
* {
* "type": "challenge_response",
* "challenge": "<original hex challenge the user received>",
* "sig": "<hex signature of the challenge>",
* "pubkey": "<hex public key of the user>"
* }
* @param original_challenge The original hex challenge string issued to the user.
* @return 0 if challenge response is verified. -1 if challenge not met or an error occurs.
*/
int verify_user_challenge_response(std::string &extracted_pubkeyhex, std::string_view response, std::string_view original_challenge)
void handle_user_message(connected_user &user, std::string_view message)
{
// We load response raw bytes into json document.
rapidjson::Document d;
// Because we project the response message directly from the binary socket buffer in a zero-copy manner, the response
// string is not null terminated. 'kParseStopWhenDoneFlag' avoids rapidjson error in this case.
d.Parse<rapidjson::kParseStopWhenDoneFlag>(response.data());
if (d.HasParseError())
{
LOG_INFO << "Challenge response json parsing failed.";
return -1;
std::lock_guard<std::mutex> lock(ctx.users_mutex);
//Add to the hashed input buffer list.
user.inputs.push_back(util::hash_buffer(message, user.pubkey));
}
// Validate msg type.
if (!d.HasMember(CHALLENGE_RESP_TYPE) || d[CHALLENGE_RESP_TYPE] != CHALLENGE_RESP_MSGTYPE)
{
LOG_INFO << "User challenge response type invalid. 'challenge_response' expected.";
return -1;
}
// Compare the response challenge string with the original issued challenge.
if (!d.HasMember(CHALLENGE_RESP_CHALLENGE) || d[CHALLENGE_RESP_CHALLENGE] != original_challenge.data())
{
LOG_INFO << "User challenge response challenge invalid.";
return -1;
}
// Check for the 'sig' field existence.
if (!d.HasMember(CHALLENGE_RESP_SIG) || !d[CHALLENGE_RESP_SIG].IsString())
{
LOG_INFO << "User challenge response signature invalid.";
return -1;
}
// Check for the 'pubkey' field existence.
if (!d.HasMember(CHALLENGE_RESP_PUBKEY) || !d[CHALLENGE_RESP_PUBKEY].IsString())
{
LOG_INFO << "User challenge response public key invalid.";
return -1;
}
// Verify the challenge signature. We do this last due to signature verification cost.
std::string_view pubkeysv = util::getsv(d[CHALLENGE_RESP_PUBKEY]);
if (crypto::verify_hex(
original_challenge,
util::getsv(d[CHALLENGE_RESP_SIG]),
pubkeysv) != 0)
{
LOG_INFO << "User challenge response signature verification failed.";
return -1;
}
extracted_pubkeyhex = pubkeysv;
return 0;
LOG_DBG << "Collected " << message.length() << " bytes from user";
}
/**
@@ -217,19 +121,19 @@ int verify_user_challenge_response(std::string &extracted_pubkeyhex, std::string
int add_user(sock::socket_session<user_outbound_message> *session, const std::string &pubkey)
{
const std::string &sessionid = session->uniqueid;
if (users.count(sessionid) == 1)
if (ctx.users.count(sessionid) == 1)
{
LOG_INFO << sessionid << " already exist. Cannot add user.";
return -1;
}
{
std::lock_guard<std::mutex> lock(users_mutex);
users.emplace(sessionid, usr::connected_user(session, pubkey));
std::lock_guard<std::mutex> lock(ctx.users_mutex);
ctx.users.emplace(sessionid, usr::connected_user(session, pubkey));
}
// Populate sessionid map so we can lookup by user pubkey.
sessionids.try_emplace(pubkey, sessionid);
ctx.sessionids.try_emplace(pubkey, sessionid);
return 0;
}
@@ -243,9 +147,9 @@ int add_user(sock::socket_session<user_outbound_message> *session, const std::st
*/
int remove_user(const std::string &sessionid)
{
auto itr = users.find(sessionid);
auto itr = ctx.users.find(sessionid);
if (itr == users.end())
if (itr == ctx.users.end())
{
LOG_INFO << sessionid << " does not exist. Cannot remove user.";
return -1;
@@ -254,11 +158,11 @@ int remove_user(const std::string &sessionid)
usr::connected_user &user = itr->second;
{
std::lock_guard<std::mutex> lock(users_mutex);
sessionids.erase(user.pubkey);
std::lock_guard<std::mutex> lock(ctx.users_mutex);
ctx.sessionids.erase(user.pubkey);
}
users.erase(itr);
ctx.users.erase(itr);
return 0;
}
@@ -269,28 +173,20 @@ void start_listening()
{
auto address = net::ip::make_address(conf::cfg.listenip);
sess_opts.max_message_size = conf::cfg.pubmaxsize;
sess_opts.max_bytes_per_minute = conf::cfg.pubmaxcpm;
listener_ctx.sess_opts.max_message_size = conf::cfg.pubmaxsize;
listener_ctx.sess_opts.max_bytes_per_minute = conf::cfg.pubmaxcpm;
std::make_shared<sock::socket_server<user_outbound_message>>(
ioc,
ctx,
listener_ctx.ioc,
listener_ctx.ssl_ctx,
tcp::endpoint{address, conf::cfg.pubport},
global_usr_session_handler,
sess_opts)
listener_ctx.global_usr_session_handler,
listener_ctx.sess_opts)
->run();
listener_thread = std::thread([&] { ioc.run(); });
listener_ctx.listener_thread = std::thread([&] { listener_ctx.ioc.run(); });
LOG_INFO << "Started listening for incoming user connections...";
}
/**
* Stops listening for incoming connections.
*/
void stop_listening()
{
//TODO
}
} // namespace usr