mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
Implemented socket message monitor infrastructure (#48)
* Completed adding message threshold * Removed unused cpp file * Completed review suggestions * Added default value check * Cleanup. * Added header definitions for socket monitor
This commit is contained in:
@@ -16,6 +16,7 @@ add_executable(hpcore
|
||||
src/sock/socket_client.cpp
|
||||
src/sock/socket_server.cpp
|
||||
src/sock/socket_session.cpp
|
||||
src/sock/socket_monitor.cpp
|
||||
src/p2p/peer_session_handler.cpp
|
||||
src/p2p/p2p.cpp
|
||||
src/usr/user_session_handler.cpp
|
||||
|
||||
116
examples/hpclient/malicious-client.js
Normal file
116
examples/hpclient/malicious-client.js
Normal file
@@ -0,0 +1,116 @@
|
||||
//
|
||||
// HotPocket client example code adopted from:
|
||||
// https://github.com/codetsunami/hotpocket/blob/master/hp_client.js
|
||||
//
|
||||
|
||||
const fs = require('fs')
|
||||
const ws_api = require('ws');
|
||||
const sodium = require('libsodium-wrappers')
|
||||
const readline = require('readline')
|
||||
|
||||
// sodium has a trigger when it's ready, we will wait and execute from there
|
||||
sodium.ready.then(main).catch((e) => { console.log(e) })
|
||||
|
||||
|
||||
function main() {
|
||||
|
||||
if (process.argv.length != 6) {
|
||||
console.log("Incorrect format");
|
||||
console.log("node client.js <ip_address> <port> <bytes_per_message> <interval>");
|
||||
process.exit();
|
||||
}
|
||||
|
||||
var args = process.argv.slice(2);
|
||||
var keys = sodium.crypto_sign_keypair()
|
||||
|
||||
|
||||
// check for client keys
|
||||
if (!fs.existsSync('.hp_client_keys')) {
|
||||
keys.privateKey = sodium.to_hex(keys.privateKey)
|
||||
keys.publicKey = sodium.to_hex(keys.publicKey)
|
||||
fs.writeFileSync('.hp_client_keys', JSON.stringify(keys))
|
||||
} else {
|
||||
keys = JSON.parse(fs.readFileSync('.hp_client_keys'))
|
||||
keys.privateKey = Uint8Array.from(Buffer.from(keys.privateKey, 'hex'))
|
||||
keys.publicKey = Uint8Array.from(Buffer.from(keys.publicKey, 'hex'))
|
||||
}
|
||||
|
||||
|
||||
var server = 'wss://localhost:8080'
|
||||
|
||||
if (process.argv.length == 3) server = 'wss://localhost:' + args[0]
|
||||
|
||||
if (process.argv.length == 4) server = 'wss://' + args[0] + ':' + args[1]
|
||||
|
||||
var ws = new ws_api(server, {
|
||||
rejectUnauthorized: false
|
||||
})
|
||||
|
||||
/* anatomy of a public challenge
|
||||
{
|
||||
hotpocket: 0.1,
|
||||
type: 'public_challenge',
|
||||
challenge: '<hex string>'
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
// if the console ctrl + c's us we should close ws gracefully
|
||||
process.once('SIGINT', function (code) {
|
||||
console.log('SIGINT received...');
|
||||
ws.close()
|
||||
});
|
||||
|
||||
ws.on('message', (m) => {
|
||||
console.log("-----Received raw message-----")
|
||||
console.log(m.toString())
|
||||
console.log("------------------------------")
|
||||
|
||||
try {
|
||||
m = JSON.parse(m)
|
||||
} catch (e) {
|
||||
return
|
||||
}
|
||||
|
||||
if (m.type != 'public_challenge') return
|
||||
|
||||
console.log("Received challenge message")
|
||||
console.log(m)
|
||||
|
||||
let pkhex = 'ed' + Buffer.from(keys.publicKey).toString('hex');
|
||||
console.log('My public key is: ' + pkhex);
|
||||
|
||||
// sign the challenge and send back the response
|
||||
var sigbytes = sodium.crypto_sign_detached(m.challenge, keys.privateKey);
|
||||
var response = {
|
||||
type: 'challenge_response',
|
||||
challenge: m.challenge,
|
||||
sig: Buffer.from(sigbytes).toString('hex'),
|
||||
pubkey: pkhex
|
||||
}
|
||||
|
||||
console.log('Sending challenge response...');
|
||||
ws.send(JSON.stringify(response));
|
||||
|
||||
setInterval(() => {
|
||||
var message = generateRandomMessage(args[2]);
|
||||
console.log("Message :" + message);
|
||||
ws.send(message);
|
||||
}, args[3]);
|
||||
|
||||
});
|
||||
|
||||
ws.on('close', () => {
|
||||
console.log('Server disconnected.');
|
||||
});
|
||||
}
|
||||
|
||||
function generateRandomMessage(length) {
|
||||
var result = '';
|
||||
var characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
|
||||
var charactersLength = characters.length;
|
||||
for (var i = 0; i < length; i++) {
|
||||
result += characters.charAt(Math.floor(Math.random() * charactersLength));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
24
src/conf.cpp
24
src/conf.cpp
@@ -100,9 +100,9 @@ int create_contract()
|
||||
cfg.roundtime = 1000;
|
||||
cfg.pubport = 8080;
|
||||
cfg.pubmaxsize = 65536;
|
||||
cfg.pubmaxmpm = 100;
|
||||
cfg.pubmaxcpm = 1000;
|
||||
cfg.peermaxsize = 65536;
|
||||
cfg.peermaxmpm = 1000;
|
||||
cfg.peermaxcpm = 1000;
|
||||
|
||||
#ifndef NDEBUG
|
||||
cfg.loglevel = "debug";
|
||||
@@ -233,9 +233,9 @@ int load_config()
|
||||
cfg.roundtime = d["roundtime"].GetInt();
|
||||
cfg.pubport = d["pubport"].GetInt();
|
||||
cfg.pubmaxsize = d["pubmaxsize"].GetInt();
|
||||
cfg.pubmaxmpm = d["pubmaxmpm"].GetInt();
|
||||
cfg.pubmaxcpm = d["pubmaxcpm"].GetInt();
|
||||
cfg.peermaxsize = d["peermaxsize"].GetInt();
|
||||
cfg.peermaxmpm = d["peermaxmpm"].GetInt();
|
||||
cfg.peermaxcpm = d["peermaxcpm"].GetInt();
|
||||
|
||||
cfg.loglevel = d["loglevel"].GetString();
|
||||
cfg.loggers.clear();
|
||||
@@ -298,9 +298,9 @@ int save_config()
|
||||
d.AddMember("roundtime", cfg.roundtime, allocator);
|
||||
d.AddMember("pubport", cfg.pubport, allocator);
|
||||
d.AddMember("pubmaxsize", cfg.pubmaxsize, allocator);
|
||||
d.AddMember("pubmaxmpm", cfg.pubmaxmpm, allocator);
|
||||
d.AddMember("pubmaxcpm", cfg.pubmaxcpm, allocator);
|
||||
d.AddMember("peermaxsize", cfg.peermaxsize, allocator);
|
||||
d.AddMember("peermaxmpm", cfg.peermaxmpm, allocator);
|
||||
d.AddMember("peermaxcpm", cfg.peermaxcpm, allocator);
|
||||
|
||||
d.AddMember("loglevel", rapidjson::StringRef(cfg.loglevel.data()), allocator);
|
||||
rapidjson::Value loggers(rapidjson::kArrayType);
|
||||
@@ -403,8 +403,8 @@ int validate_config()
|
||||
|
||||
// Other required fields.
|
||||
if (cfg.binary.empty() || cfg.listenip.empty() ||
|
||||
cfg.peerport == 0 || cfg.roundtime == 0 || cfg.pubport == 0 || cfg.pubmaxsize == 0 || cfg.pubmaxmpm == 0 || cfg.peermaxsize == 0 ||
|
||||
cfg.peermaxmpm == 0 || cfg.loglevel.empty() || cfg.loggers.empty())
|
||||
cfg.peerport == 0 || cfg.roundtime == 0 || cfg.pubport == 0 || cfg.pubmaxsize == 0 || cfg.pubmaxcpm == 0 || cfg.peermaxsize == 0 ||
|
||||
cfg.peermaxcpm == 0 || cfg.loglevel.empty() || cfg.loggers.empty())
|
||||
{
|
||||
std::cout << "Required configuration fields missing at " << ctx.configFile << std::endl;
|
||||
return -1;
|
||||
@@ -489,8 +489,8 @@ int is_schema_valid(rapidjson::Document &d)
|
||||
"{"
|
||||
"\"type\": \"object\","
|
||||
"\"required\": [ \"version\", \"pubkeyhex\", \"seckeyhex\", \"binary\", \"binargs\", \"listenip\""
|
||||
", \"peers\", \"unl\", \"peerport\", \"roundtime\", \"pubport\", \"pubmaxsize\", \"pubmaxmpm\""
|
||||
", \"peermaxsize\", \"peermaxmpm\", \"loglevel\", \"loggers\" ],"
|
||||
", \"peers\", \"unl\", \"peerport\", \"roundtime\", \"pubport\", \"pubmaxsize\", \"pubmaxcpm\""
|
||||
", \"peermaxsize\", \"peermaxcpm\", \"loglevel\", \"loggers\" ],"
|
||||
"\"properties\": {"
|
||||
"\"version\": { \"type\": \"string\" },"
|
||||
"\"pubkeyhex\": { \"type\": \"string\" },"
|
||||
@@ -510,9 +510,9 @@ int is_schema_valid(rapidjson::Document &d)
|
||||
"\"roundtime\": { \"type\": \"integer\" },"
|
||||
"\"pubport\": { \"type\": \"integer\" },"
|
||||
"\"pubmaxsize\": { \"type\": \"integer\" },"
|
||||
"\"pubmaxmpm\": { \"type\": \"integer\" },"
|
||||
"\"pubmaxcpm\": { \"type\": \"integer\" },"
|
||||
"\"peermaxsize\": { \"type\": \"integer\" },"
|
||||
"\"peermaxmpm\": { \"type\": \"integer\" },"
|
||||
"\"peermaxcpm\": { \"type\": \"integer\" },"
|
||||
"\"loglevel\": { \"type\": \"string\" },"
|
||||
"\"loggers\": {"
|
||||
"\"type\": \"array\","
|
||||
|
||||
14
src/conf.hpp
14
src/conf.hpp
@@ -49,13 +49,13 @@ struct contract_config
|
||||
std::string listenip; // The IPs to listen on for incoming connections
|
||||
std::unordered_map<std::string, ip_port_pair> peers; // Map of peers keyed by "<ip address>:<port>" concatenated format
|
||||
std::unordered_set<std::string> unl; // Unique node list (list of binary public keys)
|
||||
std::uint16_t peerport; // Listening port for peer connections
|
||||
std::uint16_t roundtime; // Consensus round time in ms
|
||||
std::uint16_t pubport; // Listening port for public user connections
|
||||
std::uint64_t pubmaxsize; // User message max size in bytes
|
||||
std::uint16_t pubmaxmpm; // User message rate (in minutes)
|
||||
std::uint64_t peermaxsize; // Peer message max size in bytes
|
||||
std::uint64_t peermaxmpm; // Peer message rate (in minutes)
|
||||
uint16_t peerport; // Listening port for peer connections
|
||||
uint16_t roundtime; // Consensus round time in ms
|
||||
uint16_t pubport; // Listening port for public user connections
|
||||
uint64_t pubmaxsize; // User message max size in bytes
|
||||
uint16_t pubmaxcpm; // User message rate (characters(bytes) per minute)
|
||||
uint64_t peermaxsize; // Peer message max size in bytes
|
||||
uint64_t peermaxcpm; // Peer message rate (characters(bytes) per minute)
|
||||
std::string loglevel; // Log severity level (debug, info, warn, error)
|
||||
std::unordered_set<std::string> loggers; // List of enabled loggers (console, file)
|
||||
};
|
||||
|
||||
@@ -70,7 +70,7 @@ int broadcast_proposal(const p2p::proposal &p);
|
||||
|
||||
void check_majority_stage(bool &is_desync, bool &should_reset, int8_t &majority_stage, vote_counter &votes);
|
||||
|
||||
void run_contract_binary(std::int64_t time);
|
||||
void run_contract_binary(int64_t time);
|
||||
|
||||
} // namespace cons
|
||||
|
||||
|
||||
@@ -80,6 +80,7 @@ void start_peer_connections()
|
||||
//setting up the message max size. Retrieve it from config
|
||||
// At the moment same settings are used to initialize a new server and client
|
||||
sess_opts.max_message_size = conf::cfg.peermaxsize;
|
||||
sess_opts.max_bytes_per_minute = conf::cfg.peermaxcpm;
|
||||
|
||||
// Start listening to peers
|
||||
std::make_shared<sock::socket_server<peer_outbound_message>>(
|
||||
|
||||
31
src/sock/socket_monitor.cpp
Normal file
31
src/sock/socket_monitor.cpp
Normal file
@@ -0,0 +1,31 @@
|
||||
#include "socket_monitor.hpp"
|
||||
#include "../p2p/peer_session_handler.hpp"
|
||||
#include "../usr/user_session_handler.hpp"
|
||||
|
||||
namespace sock
|
||||
{
|
||||
|
||||
/**
|
||||
* Act upon exceeding various thresholds in socket communication
|
||||
*
|
||||
* @param threshold Type of threshold which has exceeded.
|
||||
* @param threshold_limit Threshold limit at the time of exceedance.
|
||||
* @param session Websocket session which exceeds the threshold.
|
||||
*/
|
||||
template <class T>
|
||||
void threshold_monitor(util::SESSION_THRESHOLDS threshold, int64_t threshold_limit, socket_session<T> *session)
|
||||
{
|
||||
if (threshold == util::SESSION_THRESHOLDS::MAX_BYTES_PER_MINUTE)
|
||||
{
|
||||
// Can act accordingly
|
||||
session->close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Declaring templates with possible values for T because keeping all those in hpp file makes compile take a long time
|
||||
*/
|
||||
template void threshold_monitor(util::SESSION_THRESHOLDS threshold, int64_t threshold_limit, socket_session<p2p::peer_outbound_message> *session);
|
||||
|
||||
template void threshold_monitor(util::SESSION_THRESHOLDS threshold, int64_t threshold_limit, socket_session<usr::user_outbound_message> *session);
|
||||
} // namespace sock
|
||||
13
src/sock/socket_monitor.hpp
Normal file
13
src/sock/socket_monitor.hpp
Normal file
@@ -0,0 +1,13 @@
|
||||
#ifndef _SOCK_MONITOR_H_
|
||||
#define _SOCK_MONITOR_H_
|
||||
|
||||
#include "../util.hpp"
|
||||
#include "socket_session.hpp"
|
||||
|
||||
namespace sock{
|
||||
|
||||
template <class T>
|
||||
void threshold_monitor(util::SESSION_THRESHOLDS threshold, int64_t threshold_limit, socket_session<T> *session);
|
||||
}
|
||||
|
||||
#endif
|
||||
@@ -1,10 +1,12 @@
|
||||
#include "socket_session.hpp"
|
||||
#include "../p2p/peer_session_handler.hpp"
|
||||
#include "../usr/user_session_handler.hpp"
|
||||
#include "socket_monitor.hpp"
|
||||
|
||||
namespace sock{
|
||||
namespace sock
|
||||
{
|
||||
|
||||
template <class T>
|
||||
template <class T>
|
||||
socket_session<T>::socket_session(websocket::stream<beast::ssl_stream<beast::tcp_stream>> websocket, socket_session_handler<T> &sess_handler)
|
||||
: ws(std::move(websocket)), sess_handler(sess_handler)
|
||||
{
|
||||
@@ -23,11 +25,22 @@ socket_session<T>::~socket_session()
|
||||
* protocol failure
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::set_message_max_size(std::uint64_t size)
|
||||
void socket_session<T>::set_message_max_size(uint64_t size)
|
||||
{
|
||||
ws.read_message_max(size);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set thresholds to the socket session
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::set_threshold(util::SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint64_t intervalms)
|
||||
{
|
||||
thresholds[threshold_type].counter_value = 0;
|
||||
thresholds[threshold_type].intervalms = intervalms;
|
||||
thresholds[threshold_type].threshold_limit = threshold_limit;
|
||||
}
|
||||
|
||||
//port and address will be used to identify from which remote party the message recieved in the handler
|
||||
template <class T>
|
||||
void socket_session<T>::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts)
|
||||
@@ -40,6 +53,12 @@ void socket_session<T>::run(const std::string &&address, const std::string &&por
|
||||
set_message_max_size(sess_opts.max_message_size);
|
||||
}
|
||||
|
||||
// Create new session_threshold and insert it to thresholds array
|
||||
// Have to maintain the SESSION_THRESHOLDS enum order in inserting new thresholds to thresholds vector
|
||||
// since enum's value is used as index in the vector to update vector values
|
||||
session_threshold max_byte_per_message_threshold{sess_opts.max_bytes_per_minute, 0, 0, 60000};
|
||||
thresholds.push_back(std::move(max_byte_per_message_threshold));
|
||||
|
||||
if (is_server_session)
|
||||
{
|
||||
/**
|
||||
@@ -94,7 +113,6 @@ void socket_session<T>::on_ssl_handshake(error_code ec)
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
ws.set_option(
|
||||
websocket::stream_base::timeout::suggested(
|
||||
beast::role_type::client));
|
||||
@@ -149,6 +167,8 @@ void socket_session<T>::on_read(error_code ec, std::size_t)
|
||||
return fail(ec, "read");
|
||||
}
|
||||
|
||||
increment(util::SESSION_THRESHOLDS::MAX_BYTES_PER_MINUTE, buffer.size());
|
||||
|
||||
// Wrap the buffer data in a string_view and call session handler.
|
||||
// We DO NOT transfer ownership of buffer data to the session handler. It should
|
||||
// read and process the message and we will clear the buffer after its done with it.
|
||||
@@ -168,6 +188,44 @@ void socket_session<T>::on_read(error_code ec, std::size_t)
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Increment the provided thresholds counter value with the provided amount and validate it
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::increment(util::SESSION_THRESHOLDS threshold_type, uint64_t amount)
|
||||
{
|
||||
sock::session_threshold &t = thresholds[threshold_type];
|
||||
|
||||
// Ignore the counter if limit is set as 0.
|
||||
if (t.threshold_limit == 0)
|
||||
return;
|
||||
|
||||
uint64_t time_now = util::get_epoch_milliseconds();
|
||||
|
||||
t.counter_value += amount;
|
||||
if (t.timestamp == 0)
|
||||
{
|
||||
t.timestamp = time_now;
|
||||
}
|
||||
else
|
||||
{
|
||||
auto elapsed_time = time_now - t.timestamp;
|
||||
if (elapsed_time <= t.intervalms && t.counter_value > t.threshold_limit)
|
||||
{
|
||||
t.timestamp = 0;
|
||||
t.counter_value = 0;
|
||||
|
||||
// Invoke the threshold monitor so any actions will be performed.
|
||||
threshold_monitor(threshold_type, t.threshold_limit, this);
|
||||
}
|
||||
else if (elapsed_time > t.intervalms)
|
||||
{
|
||||
t.timestamp = time_now;
|
||||
t.counter_value = amount;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Send message through an active websocket connection
|
||||
*/
|
||||
@@ -238,7 +296,7 @@ void socket_session<T>::close()
|
||||
*/
|
||||
//type will be used identify whether the error is due to failure in closing the web socket or transfer of another exception to this method
|
||||
template <class T>
|
||||
void socket_session<T>::on_close(error_code ec, std::int8_t type)
|
||||
void socket_session<T>::on_close(error_code ec, int8_t type)
|
||||
{
|
||||
if (type == 1)
|
||||
return;
|
||||
@@ -273,21 +331,25 @@ void socket_session<T>::fail(error_code ec, char const *what)
|
||||
|
||||
/**
|
||||
* Declaring templates with possible values for T because keeping all those in hpp file makes compile take a long time
|
||||
*/
|
||||
*/
|
||||
template socket_session<p2p::peer_outbound_message>::socket_session(websocket::stream<beast::ssl_stream<beast::tcp_stream>> websocket, socket_session_handler<p2p::peer_outbound_message> &sess_handler);
|
||||
template socket_session<p2p::peer_outbound_message>::~socket_session();
|
||||
template void socket_session<p2p::peer_outbound_message>::set_message_max_size(std::uint64_t size);
|
||||
template void socket_session<p2p::peer_outbound_message>::set_message_max_size(uint64_t size);
|
||||
template void socket_session<p2p::peer_outbound_message>::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts);
|
||||
template void socket_session<p2p::peer_outbound_message>::send(p2p::peer_outbound_message msg);
|
||||
template void socket_session<p2p::peer_outbound_message>::set_threshold(util::SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint64_t intervalms);
|
||||
template void socket_session<p2p::peer_outbound_message>::increment(util::SESSION_THRESHOLDS threshold_type, uint64_t amount);
|
||||
template void socket_session<p2p::peer_outbound_message>::init_uniqueid();
|
||||
template void socket_session<p2p::peer_outbound_message>::close();
|
||||
|
||||
template socket_session<usr::user_outbound_message>::socket_session(websocket::stream<beast::ssl_stream<beast::tcp_stream>> websocket, socket_session_handler<usr::user_outbound_message> &sess_handler);
|
||||
template socket_session<usr::user_outbound_message>::~socket_session();
|
||||
template void socket_session<usr::user_outbound_message>::set_message_max_size(std::uint64_t size);
|
||||
template void socket_session<usr::user_outbound_message>::set_message_max_size(uint64_t size);
|
||||
template void socket_session<usr::user_outbound_message>::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts);
|
||||
template void socket_session<usr::user_outbound_message>::send(usr::user_outbound_message msg);
|
||||
template void socket_session<usr::user_outbound_message>::set_threshold(util::SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint64_t intervalms);
|
||||
template void socket_session<usr::user_outbound_message>::increment(util::SESSION_THRESHOLDS threshold_type, uint64_t amount);
|
||||
template void socket_session<usr::user_outbound_message>::init_uniqueid();
|
||||
template void socket_session<usr::user_outbound_message>::close();
|
||||
|
||||
}
|
||||
} // namespace sock
|
||||
@@ -4,6 +4,7 @@
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
#include <bitset>
|
||||
#include <unordered_map>
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/beast.hpp>
|
||||
#include <boost/beast/core.hpp>
|
||||
@@ -40,10 +41,29 @@ public:
|
||||
virtual std::string_view buffer() = 0;
|
||||
};
|
||||
|
||||
/*
|
||||
* Use this to keep in track of different thresholds which we need to deal with. e.g - maximum amount of bytes allowed per minute through a session
|
||||
* threshold_limit - Maximum threshold value which is allowed
|
||||
* counter_value - Counter which keeps incrementing per every message
|
||||
* timestamp - Timestamp when counter value changes
|
||||
* intervalms - Time interval in miliseconds in which the threshold and the counter value should be compared
|
||||
*/
|
||||
struct session_threshold
|
||||
{
|
||||
uint64_t threshold_limit;
|
||||
uint64_t counter_value;
|
||||
uint64_t timestamp;
|
||||
uint64_t intervalms;
|
||||
|
||||
// session_threshold(uint64_t threshold_limit, uint64_t intervalms)
|
||||
// : threshold_limit(threshold_limit), intervalms(intervalms), counter_value(0), timestamp(0) {}
|
||||
};
|
||||
|
||||
// Use this to feed the session with default options from the config file
|
||||
struct session_options
|
||||
{
|
||||
std::uint64_t max_message_size;
|
||||
uint64_t max_message_size;
|
||||
uint64_t max_bytes_per_minute;
|
||||
};
|
||||
|
||||
//Forward Declaration
|
||||
@@ -60,6 +80,7 @@ class socket_session : public std::enable_shared_from_this<socket_session<T>>
|
||||
websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws; // websocket stream used send an recieve messages
|
||||
std::vector<T> queue; // used to store messages temporarily until it is sent to the relevant party
|
||||
socket_session_handler<T> &sess_handler; // handler passed to gain access to websocket events
|
||||
std::vector<session_threshold> thresholds; // track down various thresholdsls
|
||||
|
||||
void fail(error_code ec, char const *what);
|
||||
|
||||
@@ -71,7 +92,8 @@ class socket_session : public std::enable_shared_from_this<socket_session<T>>
|
||||
|
||||
void on_write(error_code ec, std::size_t bytes_transferred);
|
||||
|
||||
void on_close(error_code ec, std::int8_t type);
|
||||
void on_close(error_code ec, int8_t type);
|
||||
|
||||
|
||||
public:
|
||||
socket_session(websocket::stream<beast::ssl_stream<beast::tcp_stream>> websocket, socket_session_handler<T> &sess_handler);
|
||||
@@ -100,7 +122,11 @@ public:
|
||||
|
||||
void send(T msg);
|
||||
|
||||
void set_message_max_size(std::uint64_t size);
|
||||
void set_message_max_size(uint64_t size);
|
||||
|
||||
void set_threshold(util::SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint64_t interval);
|
||||
|
||||
void increment(util::SESSION_THRESHOLDS threshold_type, uint64_t amount);
|
||||
|
||||
// When called, initializes the unique id string for this session.
|
||||
void init_uniqueid();
|
||||
@@ -108,7 +134,5 @@ public:
|
||||
void close();
|
||||
};
|
||||
|
||||
|
||||
|
||||
} // namespace sock
|
||||
#endif
|
||||
|
||||
@@ -270,6 +270,7 @@ void start_listening()
|
||||
|
||||
auto address = net::ip::make_address(conf::cfg.listenip);
|
||||
sess_opts.max_message_size = conf::cfg.pubmaxsize;
|
||||
sess_opts.max_bytes_per_minute = conf::cfg.pubmaxcpm;
|
||||
|
||||
std::make_shared<sock::socket_server<user_outbound_message>>(
|
||||
ioc,
|
||||
|
||||
@@ -37,6 +37,14 @@ enum SESSION_FLAG
|
||||
USER_AUTHED = 2
|
||||
};
|
||||
|
||||
/**
|
||||
* Enum used to track down various thresholds used in socket communication
|
||||
*/
|
||||
enum SESSION_THRESHOLDS
|
||||
{
|
||||
MAX_BYTES_PER_MINUTE = 0
|
||||
};
|
||||
|
||||
int bin2hex(std::string &encoded_string, const unsigned char *bin, size_t bin_len);
|
||||
|
||||
int hex2bin(unsigned char *decoded, size_t decoded_len, std::string_view hex_str);
|
||||
|
||||
Reference in New Issue
Block a user