Added element limits for queue and lists (#162)

This commit is contained in:
Chalith Desaman
2020-11-24 13:17:53 +05:30
committed by GitHub
parent d9517b6164
commit 459799760b
11 changed files with 137 additions and 37 deletions

View File

@@ -10,6 +10,7 @@ namespace comm
{
constexpr uint32_t INTERVALMS = 60000;
constexpr uint16_t UNVERIFIED_INACTIVE_TIMEOUT = 5; // Time threshold for unverified inactive connections in seconds.
constexpr uint16_t MAX_IN_MSG_QUEUE_SIZE = 64; // Maximum in message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31)....
comm_session::comm_session(
std::string_view host_address, hpws::client &&hpws_client, const bool is_inbound, const uint64_t (&metric_thresholds)[5])
@@ -17,7 +18,7 @@ namespace comm
host_address(host_address),
hpws_client(std::move(hpws_client)),
is_inbound(is_inbound),
in_msg_queue(32)
in_msg_queue(MAX_IN_MSG_QUEUE_SIZE)
{
// Create new session_thresholds and insert it to thresholds vector.
// Have to maintain the SESSION_THRESHOLDS enum order in inserting new thresholds to thresholds vector
@@ -76,7 +77,7 @@ namespace comm
std::string_view data = std::get<std::string_view>(read_result);
std::vector<char> msg(data.size());
memcpy(msg.data(), data.data(), data.size());
in_msg_queue.enqueue(std::move(msg));
in_msg_queue.try_enqueue(std::move(msg));
// Signal the hpws client that we are ready for next message.
std::optional<hpws::error> error = hpws_client->ack(data);
@@ -130,7 +131,7 @@ namespace comm
/**
* Adds the given message to the outbound message queue.
* @param message Message to be added to the outbound queue.
* @return 0 on successful addition and -1 if the session is already closed.
* @return 0 on successful addition and -1 if the session is already closed or there's no space in the queue.
*/
int comm_session::send(std::string_view message)
{

View File

@@ -299,8 +299,8 @@ namespace conf
cfg.peermaxdupmpm = d["peermaxdupmpm"].as<uint64_t>();
cfg.peermaxbadmpm = d["peermaxbadmpm"].as<uint64_t>();
cfg.peermaxbadsigpm = d["peermaxbadsigpm"].as<uint64_t>();
cfg.peermaxcons = d["peermaxcons"].as<unsigned int>();
cfg.peermaxknowncons = d["peermaxknowncons"].as<unsigned int>();
cfg.peermaxcons = d["peermaxcons"].as<uint16_t>();
cfg.peermaxknowncons = d["peermaxknowncons"].as<uint16_t>();
// If peermaxknowcons is greater than peermaxcons then show error and stop execution.
if (cfg.peermaxknowncons > cfg.peermaxcons)

View File

@@ -7,6 +7,8 @@
namespace ledger
{
constexpr const char *GENESIS_LEDGER = "0-genesis";
constexpr uint16_t HISTORY_REQ_LIST_CAP = 64; // Maximum history request count.
constexpr uint16_t HISTORY_RES_LIST_CAP = 64; // Maximum history response count.
struct sync_context
{
@@ -68,7 +70,7 @@ namespace ledger
void deinit();
void lcl_syncer_loop();
void set_sync_target(std::string_view target_lcl);
const std::pair<uint64_t, std::string> get_ledger_cache_top();

View File

@@ -359,14 +359,22 @@ namespace p2p
// Otherwise if new peer is recently updated (timestamp >) replace with the current one.
if (itr == ctx.server->req_known_remotes.end())
{
ctx.server->req_known_remotes.push_back(peer);
LOG_DEBUG << "Adding " + peer.ip_port.host_address + ":" + std::to_string(peer.ip_port.port) + " to the known peer list";
// If maximum number of peer list reached skip the rest of peers.
if (ctx.server->req_known_remotes.size() < p2p::PEER_LIST_CAP)
{
ctx.server->req_known_remotes.push_back(peer);
LOG_DEBUG << "Adding " + peer.ip_port.host_address + ":" + std::to_string(peer.ip_port.port) + " to the known peer list.";
}
else
{
LOG_DEBUG << "Rejecting " + peer.ip_port.host_address + ":" + std::to_string(peer.ip_port.port) + ". Maximum peer count reached.";
}
}
else if (itr->timestamp < peer.timestamp)
{
itr->available_capacity = peer.available_capacity;
itr->timestamp = peer.timestamp;
LOG_DEBUG << "Replacing " + peer.ip_port.host_address + ":" + std::to_string(peer.ip_port.port) + " to the known peer list";
LOG_DEBUG << "Replacing " + peer.ip_port.host_address + ":" + std::to_string(peer.ip_port.port) + " in the known peer list.";
}
}

View File

@@ -12,6 +12,12 @@
namespace p2p
{
constexpr uint16_t PROPOSAL_LIST_CAP = 64; // Maximum proposal count.
constexpr uint16_t NONUNL_PROPOSAL_LIST_CAP = 64; // Maximum nonunl proposal count.
constexpr uint16_t STATE_REQ_LIST_CAP = 64; // Maximum state request count.
constexpr uint16_t STATE_RES_LIST_CAP = 64; // Maximum state response count.
constexpr uint16_t PEER_LIST_CAP = 64; // Maximum peer count.
struct proposal
{
std::string pubkey;

View File

@@ -156,11 +156,13 @@ namespace p2p
return 0;
}
handle_proposal_message(container, content);
if (handle_proposal_message(container, content) != 0)
LOG_DEBUG << "Proposal rejected. Maximum proposal count reached. " << session.display_name();
}
else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message
{
handle_nonunl_proposal_message(container, content);
if (handle_nonunl_proposal_message(container, content) != 0)
LOG_DEBUG << "Nonunl proposal rejected. Maximum nonunl proposal count reached. " << session.display_name();
}
else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message
{
@@ -175,34 +177,72 @@ namespace p2p
}
else if (content_message_type == p2pmsg::Message_State_Request_Message)
{
// Insert request with lock.
// Check the cap and insert request with lock.
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.state_requests_mutex);
std::string state_request_msg(reinterpret_cast<const char *>(content_ptr), content_size);
ctx.collected_msgs.state_requests.push_back(std::make_pair(session.uniqueid, std::move(state_request_msg)));
// If max number of state requests reached skip the rest.
if (ctx.collected_msgs.state_requests.size() < p2p::STATE_REQ_LIST_CAP)
{
std::string state_request_msg(reinterpret_cast<const char *>(content_ptr), content_size);
ctx.collected_msgs.state_requests.push_back(std::make_pair(session.uniqueid, std::move(state_request_msg)));
}
else
{
LOG_DEBUG << "State request rejected. Maximum state request count reached. " << session.display_name();
}
}
else if (content_message_type == p2pmsg::Message_State_Response_Message)
{
if (state_sync::ctx.is_syncing) // Only accept state responses if state is syncing.
{
// Insert state_response with lock.
// Check the cap and insert state_response with lock.
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.state_responses_mutex);
std::string response(reinterpret_cast<const char *>(content_ptr), content_size);
ctx.collected_msgs.state_responses.push_back(std::make_pair(session.uniqueid, std::move(response)));
// If max number of state responses reached skip the rest.
if (ctx.collected_msgs.state_responses.size() < p2p::STATE_RES_LIST_CAP)
{
std::string response(reinterpret_cast<const char *>(content_ptr), content_size);
ctx.collected_msgs.state_responses.push_back(std::make_pair(session.uniqueid, std::move(response)));
}
else
{
LOG_DEBUG << "State response rejected. Maximum state response count reached. " << session.display_name();
}
}
}
else if (content_message_type == p2pmsg::Message_History_Request_Message) //message is a lcl history request message
{
const p2p::history_request hr = p2pmsg::create_history_request_from_msg(*content->message_as_History_Request_Message());
// Check the cap and insert request with lock.
std::scoped_lock<std::mutex> lock(ledger::sync_ctx.list_mutex);
ledger::sync_ctx.collected_history_requests.push_back(std::make_pair(session.uniqueid, std::move(hr)));
// If max number of history requests reached skip the rest.
if (ledger::sync_ctx.collected_history_requests.size() < ledger::HISTORY_REQ_LIST_CAP)
{
const p2p::history_request hr = p2pmsg::create_history_request_from_msg(*content->message_as_History_Request_Message());
ledger::sync_ctx.collected_history_requests.push_back(std::make_pair(session.uniqueid, std::move(hr)));
}
else
{
LOG_DEBUG << "History request rejected. Maximum history request count reached. " << session.display_name();
}
}
else if (content_message_type == p2pmsg::Message_History_Response_Message) //message is a lcl history response message
{
if (ledger::sync_ctx.is_syncing) // Only accept history responses if ledger is syncing.
{
const p2p::history_response hr = p2pmsg::create_history_response_from_msg(*content->message_as_History_Response_Message());
// Check the cap and insert response with lock.
std::scoped_lock<std::mutex> lock(ledger::sync_ctx.list_mutex);
ledger::sync_ctx.collected_history_responses.push_back(std::move(hr));
// If max number of history respoinses reached skip the rest.
if (ledger::sync_ctx.collected_history_responses.size() < ledger::HISTORY_RES_LIST_CAP)
{
const p2p::history_response hr = p2pmsg::create_history_response_from_msg(*content->message_as_History_Response_Message());
ledger::sync_ctx.collected_history_responses.push_back(std::move(hr));
}
else
{
LOG_DEBUG << "History response rejected. Maximum history response count reached. " << session.display_name();
}
}
}
else
@@ -236,29 +276,61 @@ namespace p2p
const p2pmsg::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc
if (content_message_type == p2pmsg::Message_Proposal_Message) // message is a proposal message
handle_proposal_message(container, content);
{
if (handle_proposal_message(container, content) != 0)
LOG_DEBUG << "Proposal rejected. Maximum proposal count reached. self";
}
else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message
handle_nonunl_proposal_message(container, content);
{
if (handle_nonunl_proposal_message(container, content) != 0)
LOG_DEBUG << "Nonunl proposal rejected. Maximum nonunl proposal count reached. self";
}
else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message
handle_npl_message(container, content);
return 0;
}
void handle_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content)
/**
* Handle proposal message.
* @param container Message container.
* @param content Message content.
* @return returns 0 if proposal is pushed to the list, otherwise -1.
*/
int handle_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content)
{
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.proposals_mutex); // Insert proposal with lock.
// Check the cap and insert proposal with lock.
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.proposals_mutex);
// If max number of proposals reached skip the rest.
if (ctx.collected_msgs.proposals.size() == p2p::PROPOSAL_LIST_CAP)
return -1;
ctx.collected_msgs.proposals.push_back(
p2pmsg::create_proposal_from_msg(*content->message_as_Proposal_Message(), container->pubkey(), container->timestamp(), container->lcl()));
return 0;
}
void handle_nonunl_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content)
/**
* Handle nonunl proposal message.
* @param container Message container.
* @param content Message content.
* @return returns 0 if nonunl proposal is pushed to the list, otherwise -1.
*/
int handle_nonunl_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content)
{
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.nonunl_proposals_mutex); // Insert non-unl proposal with lock.
// Check the cap and insert proposal with lock.
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.nonunl_proposals_mutex);
// If max number of nonunl proposals reached skip the rest.
if (ctx.collected_msgs.nonunl_proposals.size() == p2p::NONUNL_PROPOSAL_LIST_CAP)
return -1;
ctx.collected_msgs.nonunl_proposals.push_back(
p2pmsg::create_nonunl_proposal_from_msg(*content->message_as_NonUnl_Proposal_Message(), container->timestamp()));
return 0;
}
void handle_npl_message(const p2pmsg::Container *container, const p2pmsg::Content *content)

View File

@@ -14,9 +14,9 @@ namespace p2p
int handle_peer_message(p2p::peer_comm_session &session, std::string_view message);
int handle_self_message(std::string_view message);
int handle_peer_close(const p2p::peer_comm_session &session);
int handle_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content);
int handle_nonunl_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content);
void handle_peer_on_verified(p2p::peer_comm_session &session);
void handle_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content);
void handle_nonunl_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content);
void handle_npl_message(const p2pmsg::Container *container, const p2pmsg::Content *content);
} // namespace p2p

View File

@@ -13,6 +13,8 @@
*/
namespace sc
{
constexpr uint16_t MAX_NPL_MSG_QUEUE_SIZE = 64; // Maximum npl message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31)....
constexpr uint16_t MAX_CONTROL_MSG_QUEUE_SIZE = 64; // Maximum out message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31)....
// Enum used to differenciate socket fds maintained for SC socket.
enum SOCKETFDTYPE
@@ -86,7 +88,10 @@ namespace sc
// State hash after execution will be copied to this (not applicable to read only mode).
hpfs::h32 post_execution_state_hash = hpfs::h32_empty;
contract_execution_args(util::buffer_store &user_input_store) : user_input_store(user_input_store)
contract_execution_args(util::buffer_store &user_input_store)
: user_input_store(user_input_store),
npl_messages(MAX_NPL_MSG_QUEUE_SIZE),
control_messages(MAX_CONTROL_MSG_QUEUE_SIZE)
{
}
};

View File

@@ -12,9 +12,9 @@
*/
namespace read_req
{
constexpr uint16_t LOOP_WAIT = 100; // Milliseconds.
constexpr uint16_t MAX_QUEUE_SIZE = 100; // Maximum read request queue size.
constexpr uint16_t MAX_THREAD_CAP = 5; // Maximum number of read request processing threads.
constexpr uint16_t LOOP_WAIT = 100; // Milliseconds.
constexpr uint16_t MAX_QUEUE_SIZE = 64; // Maximum read request queue size, The size passed is rounded up to the next multiple of the block size (32).
constexpr uint16_t MAX_THREAD_CAP = 5; // Maximum number of read request processing threads.
bool is_shutting_down = false;
bool init_success = false;
@@ -22,7 +22,7 @@ namespace read_req
util::buffer_store read_req_store;
std::thread thread_pool_executor; // Thread which spawns new threads for the read requests is the queue.
std::vector<std::thread> read_req_threads;
moodycamel::ConcurrentQueue<user_read_req> read_req_queue(MAX_QUEUE_SIZE);
moodycamel::ConcurrentQueue<user_read_req> read_req_queue(MAX_QUEUE_SIZE, 0, MAX_THREAD_CAP);
std::mutex execution_contexts_mutex;
std::list<sc::execution_context> execution_contexts;
std::mutex completed_threads_mutex;

View File

@@ -201,6 +201,13 @@ namespace usr
*/
int add_user(usr::user_comm_session &session, const std::string &pubkey_hex, std::string_view protocol_code)
{
// If max number of user connections reached skip the rest.
if (ctx.users.size() == MAX_USER_COUNT)
{
LOG_DEBUG << "Rejecting " + session.display_name() << ". Maximum user count reached.";
return -1;
}
// Decode hex pubkey and get binary pubkey. We are only going to keep
// the binary pubkey due to reduced memory footprint.
std::string pubkey;

View File

@@ -16,6 +16,8 @@
*/
namespace usr
{
constexpr uint16_t MAX_USER_COUNT = 64; // Maximum number of user.
/**
* Holds information about an authenticated (challenge-verified) user
* connected to the HotPocket node.
@@ -28,9 +30,6 @@ namespace usr
// Holds the unprocessed user inputs collected from websocket.
std::list<user_input> submitted_inputs;
// Holds the unprocessed read requests collected from websocket.
std::list<std::string> read_requests;
// Holds the websocket session of this user.
// We don't need to own the session object since the lifetime of user and session are coupled.
usr::user_comm_session &session;