mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
Parallel read request processing with thread pool. (#118)
This commit is contained in:
@@ -78,7 +78,6 @@ namespace conf
|
||||
boost::filesystem::create_directories(ctx.config_dir);
|
||||
boost::filesystem::create_directories(ctx.hist_dir);
|
||||
boost::filesystem::create_directories(ctx.state_rw_dir);
|
||||
boost::filesystem::create_directories(ctx.state_read_req_dir);
|
||||
|
||||
//Create config file with default settings.
|
||||
|
||||
@@ -146,7 +145,6 @@ namespace conf
|
||||
ctx.hist_dir = basedir + "/hist";
|
||||
ctx.state_dir = basedir + "/state";
|
||||
ctx.state_rw_dir = ctx.state_dir + "/rw";
|
||||
ctx.state_read_req_dir = ctx.state_dir + "/rr";
|
||||
ctx.log_dir = basedir + "/log";
|
||||
}
|
||||
|
||||
|
||||
@@ -35,7 +35,6 @@ namespace conf
|
||||
std::string hist_dir; // Contract ledger history dir full path
|
||||
std::string state_dir; // Contract state maintenence path (hpfs path)
|
||||
std::string state_rw_dir; // Contract executation read/write state path.
|
||||
std::string state_read_req_dir; // Contract executation state path for read requests.
|
||||
std::string log_dir; // Contract log dir full path
|
||||
std::string config_dir; // Contract config dir full path
|
||||
std::string config_file; // Full path to the contract config file
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
#include "../pchheader.hpp"
|
||||
#include "../hplog.hpp"
|
||||
#include "../util.hpp"
|
||||
#include "../sc.hpp"
|
||||
#include "../conf.hpp"
|
||||
#include "../msg/usrmsg_parser.hpp"
|
||||
#include "usr.hpp"
|
||||
@@ -12,18 +11,23 @@
|
||||
*/
|
||||
namespace read_req
|
||||
{
|
||||
constexpr uint16_t LOOP_WAIT = 100; // Milliseconds
|
||||
constexpr uint16_t LOOP_WAIT = 100; // Milliseconds.
|
||||
constexpr uint16_t MAX_QUEUE_SIZE = 100; // Maximum read request queue size.
|
||||
constexpr uint16_t MAX_THREAD_CAP = 5; // Maximum number of read request processing threads.
|
||||
|
||||
bool is_shutting_down = false;
|
||||
bool init_success = false;
|
||||
std::thread read_req_thread;
|
||||
sc::execution_context contract_ctx;
|
||||
std::thread thread_pool_executor; // Thread which spawns new threads for the read requests is the queue.
|
||||
std::vector<std::thread> read_req_threads;
|
||||
moodycamel::ConcurrentQueue<user_read_req> read_req_queue(MAX_QUEUE_SIZE);
|
||||
std::mutex execution_contexts_mutex;
|
||||
std::list<sc::execution_context> execution_contexts;
|
||||
std::mutex completed_threads_mutex;
|
||||
std::vector<pthread_t> completed_threads;
|
||||
|
||||
int init()
|
||||
{
|
||||
contract_ctx.args.state_dir = conf::ctx.state_read_req_dir;
|
||||
contract_ctx.args.readonly = true;
|
||||
|
||||
read_req_thread = std::thread(read_request_processor);
|
||||
thread_pool_executor = std::thread(manage_thread_pool);
|
||||
init_success = true;
|
||||
return 0;
|
||||
}
|
||||
@@ -34,108 +38,200 @@ namespace read_req
|
||||
{
|
||||
is_shutting_down = true;
|
||||
|
||||
// Stop the contract if running.
|
||||
sc::stop(contract_ctx);
|
||||
// Joining thread pool executor.
|
||||
thread_pool_executor.join();
|
||||
|
||||
read_req_thread.join();
|
||||
{
|
||||
// Force stoping all running contracts.
|
||||
std::scoped_lock<std::mutex> lock(execution_contexts_mutex);
|
||||
for (sc::execution_context &execution_context : execution_contexts)
|
||||
sc::stop(execution_context);
|
||||
}
|
||||
|
||||
// Joining all read request processing threads.
|
||||
for (std::thread &thread : read_req_threads)
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
void read_request_processor()
|
||||
/**
|
||||
* Processing read requests via multiple threads by checking for maximum thread cap and read request availability.
|
||||
*/
|
||||
void manage_thread_pool()
|
||||
{
|
||||
LOG_INFO << "Read request thread pool manager started.";
|
||||
util::mask_signal();
|
||||
|
||||
LOG_INFO << "Read request server started.";
|
||||
|
||||
// Lists of read requests submitted by users keyed by user pubkey.
|
||||
std::unordered_map<std::string, std::list<std::string>> read_requests;
|
||||
|
||||
while (!is_shutting_down)
|
||||
{
|
||||
util::sleep(LOOP_WAIT);
|
||||
|
||||
// Cleanup any exited threads.
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(usr::ctx.users_mutex);
|
||||
|
||||
// Move collected read requests from users over to local requests list.
|
||||
for (auto &[sid, user] : usr::ctx.users)
|
||||
std::scoped_lock<std::mutex> lock(completed_threads_mutex);
|
||||
if (!completed_threads.empty())
|
||||
{
|
||||
if (!user.read_requests.empty())
|
||||
// Remove all the completed threads from the read_req_threads list.
|
||||
for (const pthread_t thread_id : completed_threads)
|
||||
{
|
||||
std::list<std::string> user_read_requests;
|
||||
user_read_requests.splice(user_read_requests.end(), user.read_requests);
|
||||
|
||||
read_requests.try_emplace(user.pubkey, std::move(user_read_requests));
|
||||
// Remove thread with the given completed thread id from the read_req_threads list.
|
||||
remove_thread(thread_id);
|
||||
}
|
||||
|
||||
LOG_DBG << completed_threads.size() << " threads cleaned from read requests thread pool.";
|
||||
|
||||
// Clear the completed thread id list once the completed threads are removed from the list.
|
||||
completed_threads.clear();
|
||||
}
|
||||
}
|
||||
|
||||
if (!read_requests.empty())
|
||||
if (read_req_queue.size_approx() != 0 && read_req_threads.size() <= MAX_THREAD_CAP)
|
||||
{
|
||||
LOG_DBG << "Processing read requests... count:" << read_requests.size();
|
||||
read_req_threads.push_back(std::thread(read_request_processor));
|
||||
if (read_req_queue.size_approx() == 1)
|
||||
{
|
||||
// The sleep is added to avoid creating a new thread before the newly created thread dequeue the job
|
||||
// from the queue.
|
||||
util::sleep(10);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
util::sleep(LOOP_WAIT);
|
||||
}
|
||||
}
|
||||
LOG_INFO << "Read request thread pool manager ended.";
|
||||
}
|
||||
|
||||
/**
|
||||
* Process read requests from read request queue and execute smart contract.
|
||||
* Process all the available read requests and exits if the queue is empty.
|
||||
*/
|
||||
void read_request_processor()
|
||||
{
|
||||
LOG_DBG << "A new read request processing thread started.";
|
||||
|
||||
util::mask_signal();
|
||||
|
||||
std::list<sc::execution_context>::iterator context_itr;
|
||||
|
||||
// Own pthread id.
|
||||
const pthread_t thread_id = pthread_self();
|
||||
|
||||
while (!is_shutting_down)
|
||||
{
|
||||
user_read_req read_request;
|
||||
if (read_req_queue.try_dequeue(read_request))
|
||||
{
|
||||
{
|
||||
// Contract context is added to the list for force kill if a SIGINT is received.
|
||||
sc::execution_context contract_ctx;
|
||||
std::scoped_lock<std::mutex> execution_contract_lock(execution_contexts_mutex);
|
||||
context_itr = execution_contexts.emplace(execution_contexts.begin(), std::move(contract_ctx));
|
||||
}
|
||||
|
||||
// Populate execution context data if any read requests are available in the queue.
|
||||
initialize_execution_context(std::move(read_request), thread_id, *context_itr);
|
||||
LOG_DBG << "Read request contract execution started.";
|
||||
|
||||
// Process the read requests by executing the contract.
|
||||
if (execute_contract(read_requests) != -1)
|
||||
if (sc::execute_contract(*context_itr) != -1)
|
||||
{
|
||||
// If contract execution was succcessful, send the outputs back to users.
|
||||
// If contract execution was succcessful, send the output back to user.
|
||||
std::lock_guard<std::mutex> lock(usr::ctx.users_mutex);
|
||||
|
||||
uint32_t dispatch_count = 0;
|
||||
for (auto &[pubkey, bufpair] : contract_ctx.args.userbufs)
|
||||
const auto user_buf_itr = context_itr->args.userbufs.begin();
|
||||
if (!user_buf_itr->second.output.empty())
|
||||
{
|
||||
if (!bufpair.output.empty())
|
||||
// Find the user session by user pubkey.
|
||||
const auto sess_itr = usr::ctx.sessionids.find(user_buf_itr->first);
|
||||
if (sess_itr != usr::ctx.sessionids.end()) // match found
|
||||
{
|
||||
// Find the user session by user pubkey.
|
||||
const auto sess_itr = usr::ctx.sessionids.find(pubkey);
|
||||
if (sess_itr != usr::ctx.sessionids.end()) // match found
|
||||
const auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id.
|
||||
if (user_itr != usr::ctx.users.end()) // match found
|
||||
{
|
||||
const auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id.
|
||||
if (user_itr != usr::ctx.users.end()) // match found
|
||||
{
|
||||
std::string outputtosend;
|
||||
outputtosend.swap(bufpair.output);
|
||||
std::string outputtosend;
|
||||
outputtosend.swap(user_buf_itr->second.output);
|
||||
|
||||
const usr::connected_user &user = user_itr->second;
|
||||
msg::usrmsg::usrmsg_parser parser(user.protocol);
|
||||
const usr::connected_user &user = user_itr->second;
|
||||
msg::usrmsg::usrmsg_parser parser(user.protocol);
|
||||
|
||||
std::vector<uint8_t> msg;
|
||||
parser.create_contract_read_response_container(msg, outputtosend);
|
||||
|
||||
user.session.send(msg);
|
||||
dispatch_count++;
|
||||
}
|
||||
std::vector<uint8_t> msg;
|
||||
parser.create_contract_read_response_container(msg, outputtosend);
|
||||
user.session.send(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sc::clear_args(contract_ctx.args);
|
||||
LOG_DBG << "Dispatched read request responses. count:" << dispatch_count;
|
||||
LOG_DBG << "Read request contract execution ended.";
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_ERR << "Contract execution for read requests failed.";
|
||||
LOG_ERR << "Contract execution for read request failed.";
|
||||
}
|
||||
|
||||
read_requests.clear();
|
||||
// Remove successfully executed execution contexts.
|
||||
std::scoped_lock<std::mutex> execution_contract_lock(execution_contexts_mutex);
|
||||
execution_contexts.erase(context_itr);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DBG << "Thread exits, due to no more read requests.";
|
||||
// Break while loop if no read request is present in the queue for processing.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
LOG_INFO << "Read request server stopped.";
|
||||
// Add current thread id to to the list of completed threads.
|
||||
std::scoped_lock<std::mutex> lock(completed_threads_mutex);
|
||||
completed_threads.push_back(thread_id);
|
||||
|
||||
LOG_DBG << "Read request processing thread exited.";
|
||||
}
|
||||
|
||||
int execute_contract(std::unordered_map<std::string, std::list<std::string>> &read_requests)
|
||||
/**
|
||||
* Add new read request from users to the read request queue for processing.
|
||||
* @param pubkey Public key of the user.
|
||||
* @param content Message content.
|
||||
* @return 0 on successful addition and -1 on queue overflow
|
||||
*/
|
||||
int populate_read_req_queue(const std::string &pubkey, const std::string &content)
|
||||
{
|
||||
// Populate read requests to user buf map.
|
||||
for (auto &[pubkey, requests] : read_requests)
|
||||
sc::execution_context contract_ctx;
|
||||
|
||||
user_read_req read_request;
|
||||
read_request.content = std::move(content);
|
||||
read_request.pubkey = pubkey;
|
||||
|
||||
return read_req_queue.try_enqueue(read_request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Populate execution context data from the given read request.
|
||||
* @param read_request Received read request.
|
||||
* @param contract_ctx Execution context to be populated.
|
||||
* @param thread_id Id of the current thread.
|
||||
*/
|
||||
void initialize_execution_context(const user_read_req &read_request, const pthread_t thread_id, sc::execution_context &contract_ctx)
|
||||
{
|
||||
// Create new folder with the thread id per each thread.
|
||||
contract_ctx.args.state_dir = conf::ctx.state_dir;
|
||||
contract_ctx.args.state_dir.append("/rr_").append(std::to_string(thread_id));
|
||||
contract_ctx.args.readonly = true;
|
||||
sc::contract_iobuf_pair user_bufpair;
|
||||
user_bufpair.inputs.push_back(std::move(read_request.content));
|
||||
contract_ctx.args.userbufs.try_emplace(read_request.pubkey, std::move(user_bufpair));
|
||||
}
|
||||
|
||||
/**
|
||||
* Join the thread with the given id and remove from the thread list.
|
||||
* @param id Id of the thread to be joined and removed.
|
||||
*/
|
||||
void remove_thread(const pthread_t id)
|
||||
{
|
||||
const auto iter = std::find_if(read_req_threads.begin(), read_req_threads.end(), [=](std::thread &t) { return (t.native_handle() == id); });
|
||||
if (iter != read_req_threads.end())
|
||||
{
|
||||
sc::contract_iobuf_pair user_bufpair;
|
||||
user_bufpair.inputs.splice(user_bufpair.inputs.end(), requests);
|
||||
|
||||
contract_ctx.args.userbufs.try_emplace(pubkey, std::move(user_bufpair));
|
||||
iter->join();
|
||||
read_req_threads.erase(iter);
|
||||
}
|
||||
|
||||
// Execute the contract.
|
||||
return sc::execute_contract(contract_ctx);
|
||||
}
|
||||
|
||||
} // namespace read_req
|
||||
@@ -1,15 +1,29 @@
|
||||
#ifndef _HP_CONS_READ_REQ_
|
||||
#define _HP_CONS_READ_REQ_
|
||||
|
||||
#include "../sc.hpp"
|
||||
|
||||
namespace read_req
|
||||
{
|
||||
struct user_read_req
|
||||
{
|
||||
std::string pubkey;
|
||||
std::string content;
|
||||
};
|
||||
|
||||
int init();
|
||||
|
||||
void deinit();
|
||||
|
||||
|
||||
void manage_thread_pool();
|
||||
|
||||
void read_request_processor();
|
||||
|
||||
int execute_contract(std::unordered_map<std::string, std::list<std::string>> &read_requests);
|
||||
int populate_read_req_queue(const std::string &pubkey, const std::string &content);
|
||||
|
||||
void initialize_execution_context(const user_read_req &read_request, const pthread_t thread_id, sc::execution_context &contract_ctx);
|
||||
|
||||
void remove_thread(const pthread_t id);
|
||||
|
||||
} // namespace read_req
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include "usr.hpp"
|
||||
#include "user_session_handler.hpp"
|
||||
#include "user_input.hpp"
|
||||
#include "read_req.hpp"
|
||||
|
||||
namespace usr
|
||||
{
|
||||
@@ -143,10 +144,7 @@ namespace usr
|
||||
std::string content;
|
||||
if (parser.extract_read_request(content) == 0)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx.users_mutex);
|
||||
|
||||
//Add to the user's pending read requests list.
|
||||
user.read_requests.push_back(std::move(content));
|
||||
read_req::populate_read_req_queue(user.pubkey, std::move(content));
|
||||
return 0;
|
||||
}
|
||||
else
|
||||
|
||||
Reference in New Issue
Block a user