diff --git a/src/conf.cpp b/src/conf.cpp index d66d73b1..c1651509 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -78,7 +78,6 @@ namespace conf boost::filesystem::create_directories(ctx.config_dir); boost::filesystem::create_directories(ctx.hist_dir); boost::filesystem::create_directories(ctx.state_rw_dir); - boost::filesystem::create_directories(ctx.state_read_req_dir); //Create config file with default settings. @@ -146,7 +145,6 @@ namespace conf ctx.hist_dir = basedir + "/hist"; ctx.state_dir = basedir + "/state"; ctx.state_rw_dir = ctx.state_dir + "/rw"; - ctx.state_read_req_dir = ctx.state_dir + "/rr"; ctx.log_dir = basedir + "/log"; } diff --git a/src/conf.hpp b/src/conf.hpp index ca9dada5..7e461160 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -35,7 +35,6 @@ namespace conf std::string hist_dir; // Contract ledger history dir full path std::string state_dir; // Contract state maintenence path (hpfs path) std::string state_rw_dir; // Contract executation read/write state path. - std::string state_read_req_dir; // Contract executation state path for read requests. std::string log_dir; // Contract log dir full path std::string config_dir; // Contract config dir full path std::string config_file; // Full path to the contract config file diff --git a/src/usr/read_req.cpp b/src/usr/read_req.cpp index 7422d621..ca989b09 100644 --- a/src/usr/read_req.cpp +++ b/src/usr/read_req.cpp @@ -1,7 +1,6 @@ #include "../pchheader.hpp" #include "../hplog.hpp" #include "../util.hpp" -#include "../sc.hpp" #include "../conf.hpp" #include "../msg/usrmsg_parser.hpp" #include "usr.hpp" @@ -12,18 +11,23 @@ */ namespace read_req { - constexpr uint16_t LOOP_WAIT = 100; // Milliseconds + constexpr uint16_t LOOP_WAIT = 100; // Milliseconds. + constexpr uint16_t MAX_QUEUE_SIZE = 100; // Maximum read request queue size. + constexpr uint16_t MAX_THREAD_CAP = 5; // Maximum number of read request processing threads. + bool is_shutting_down = false; bool init_success = false; - std::thread read_req_thread; - sc::execution_context contract_ctx; + std::thread thread_pool_executor; // Thread which spawns new threads for the read requests is the queue. + std::vector read_req_threads; + moodycamel::ConcurrentQueue read_req_queue(MAX_QUEUE_SIZE); + std::mutex execution_contexts_mutex; + std::list execution_contexts; + std::mutex completed_threads_mutex; + std::vector completed_threads; int init() { - contract_ctx.args.state_dir = conf::ctx.state_read_req_dir; - contract_ctx.args.readonly = true; - - read_req_thread = std::thread(read_request_processor); + thread_pool_executor = std::thread(manage_thread_pool); init_success = true; return 0; } @@ -34,108 +38,200 @@ namespace read_req { is_shutting_down = true; - // Stop the contract if running. - sc::stop(contract_ctx); + // Joining thread pool executor. + thread_pool_executor.join(); - read_req_thread.join(); + { + // Force stoping all running contracts. + std::scoped_lock lock(execution_contexts_mutex); + for (sc::execution_context &execution_context : execution_contexts) + sc::stop(execution_context); + } + + // Joining all read request processing threads. + for (std::thread &thread : read_req_threads) + thread.join(); } } - void read_request_processor() + /** + * Processing read requests via multiple threads by checking for maximum thread cap and read request availability. + */ + void manage_thread_pool() { + LOG_INFO << "Read request thread pool manager started."; util::mask_signal(); - LOG_INFO << "Read request server started."; - - // Lists of read requests submitted by users keyed by user pubkey. - std::unordered_map> read_requests; - while (!is_shutting_down) { - util::sleep(LOOP_WAIT); - + // Cleanup any exited threads. { - std::lock_guard lock(usr::ctx.users_mutex); - - // Move collected read requests from users over to local requests list. - for (auto &[sid, user] : usr::ctx.users) + std::scoped_lock lock(completed_threads_mutex); + if (!completed_threads.empty()) { - if (!user.read_requests.empty()) + // Remove all the completed threads from the read_req_threads list. + for (const pthread_t thread_id : completed_threads) { - std::list user_read_requests; - user_read_requests.splice(user_read_requests.end(), user.read_requests); - - read_requests.try_emplace(user.pubkey, std::move(user_read_requests)); + // Remove thread with the given completed thread id from the read_req_threads list. + remove_thread(thread_id); } + + LOG_DBG << completed_threads.size() << " threads cleaned from read requests thread pool."; + + // Clear the completed thread id list once the completed threads are removed from the list. + completed_threads.clear(); } } - if (!read_requests.empty()) + if (read_req_queue.size_approx() != 0 && read_req_threads.size() <= MAX_THREAD_CAP) { - LOG_DBG << "Processing read requests... count:" << read_requests.size(); + read_req_threads.push_back(std::thread(read_request_processor)); + if (read_req_queue.size_approx() == 1) + { + // The sleep is added to avoid creating a new thread before the newly created thread dequeue the job + // from the queue. + util::sleep(10); + } + } + else + { + util::sleep(LOOP_WAIT); + } + } + LOG_INFO << "Read request thread pool manager ended."; + } + + /** + * Process read requests from read request queue and execute smart contract. + * Process all the available read requests and exits if the queue is empty. + */ + void read_request_processor() + { + LOG_DBG << "A new read request processing thread started."; + + util::mask_signal(); + + std::list::iterator context_itr; + + // Own pthread id. + const pthread_t thread_id = pthread_self(); + + while (!is_shutting_down) + { + user_read_req read_request; + if (read_req_queue.try_dequeue(read_request)) + { + { + // Contract context is added to the list for force kill if a SIGINT is received. + sc::execution_context contract_ctx; + std::scoped_lock execution_contract_lock(execution_contexts_mutex); + context_itr = execution_contexts.emplace(execution_contexts.begin(), std::move(contract_ctx)); + } + + // Populate execution context data if any read requests are available in the queue. + initialize_execution_context(std::move(read_request), thread_id, *context_itr); + LOG_DBG << "Read request contract execution started."; // Process the read requests by executing the contract. - if (execute_contract(read_requests) != -1) + if (sc::execute_contract(*context_itr) != -1) { - // If contract execution was succcessful, send the outputs back to users. + // If contract execution was succcessful, send the output back to user. std::lock_guard lock(usr::ctx.users_mutex); - uint32_t dispatch_count = 0; - for (auto &[pubkey, bufpair] : contract_ctx.args.userbufs) + const auto user_buf_itr = context_itr->args.userbufs.begin(); + if (!user_buf_itr->second.output.empty()) { - if (!bufpair.output.empty()) + // Find the user session by user pubkey. + const auto sess_itr = usr::ctx.sessionids.find(user_buf_itr->first); + if (sess_itr != usr::ctx.sessionids.end()) // match found { - // Find the user session by user pubkey. - const auto sess_itr = usr::ctx.sessionids.find(pubkey); - if (sess_itr != usr::ctx.sessionids.end()) // match found + const auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id. + if (user_itr != usr::ctx.users.end()) // match found { - const auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id. - if (user_itr != usr::ctx.users.end()) // match found - { - std::string outputtosend; - outputtosend.swap(bufpair.output); + std::string outputtosend; + outputtosend.swap(user_buf_itr->second.output); - const usr::connected_user &user = user_itr->second; - msg::usrmsg::usrmsg_parser parser(user.protocol); + const usr::connected_user &user = user_itr->second; + msg::usrmsg::usrmsg_parser parser(user.protocol); - std::vector msg; - parser.create_contract_read_response_container(msg, outputtosend); - - user.session.send(msg); - dispatch_count++; - } + std::vector msg; + parser.create_contract_read_response_container(msg, outputtosend); + user.session.send(msg); } } } - - sc::clear_args(contract_ctx.args); - LOG_DBG << "Dispatched read request responses. count:" << dispatch_count; + LOG_DBG << "Read request contract execution ended."; } else { - LOG_ERR << "Contract execution for read requests failed."; + LOG_ERR << "Contract execution for read request failed."; } - read_requests.clear(); + // Remove successfully executed execution contexts. + std::scoped_lock execution_contract_lock(execution_contexts_mutex); + execution_contexts.erase(context_itr); + } + else + { + LOG_DBG << "Thread exits, due to no more read requests."; + // Break while loop if no read request is present in the queue for processing. + break; } } - LOG_INFO << "Read request server stopped."; + // Add current thread id to to the list of completed threads. + std::scoped_lock lock(completed_threads_mutex); + completed_threads.push_back(thread_id); + + LOG_DBG << "Read request processing thread exited."; } - int execute_contract(std::unordered_map> &read_requests) + /** + * Add new read request from users to the read request queue for processing. + * @param pubkey Public key of the user. + * @param content Message content. + * @return 0 on successful addition and -1 on queue overflow + */ + int populate_read_req_queue(const std::string &pubkey, const std::string &content) { - // Populate read requests to user buf map. - for (auto &[pubkey, requests] : read_requests) + sc::execution_context contract_ctx; + + user_read_req read_request; + read_request.content = std::move(content); + read_request.pubkey = pubkey; + + return read_req_queue.try_enqueue(read_request); + } + + /** + * Populate execution context data from the given read request. + * @param read_request Received read request. + * @param contract_ctx Execution context to be populated. + * @param thread_id Id of the current thread. + */ + void initialize_execution_context(const user_read_req &read_request, const pthread_t thread_id, sc::execution_context &contract_ctx) + { + // Create new folder with the thread id per each thread. + contract_ctx.args.state_dir = conf::ctx.state_dir; + contract_ctx.args.state_dir.append("/rr_").append(std::to_string(thread_id)); + contract_ctx.args.readonly = true; + sc::contract_iobuf_pair user_bufpair; + user_bufpair.inputs.push_back(std::move(read_request.content)); + contract_ctx.args.userbufs.try_emplace(read_request.pubkey, std::move(user_bufpair)); + } + + /** + * Join the thread with the given id and remove from the thread list. + * @param id Id of the thread to be joined and removed. + */ + void remove_thread(const pthread_t id) + { + const auto iter = std::find_if(read_req_threads.begin(), read_req_threads.end(), [=](std::thread &t) { return (t.native_handle() == id); }); + if (iter != read_req_threads.end()) { - sc::contract_iobuf_pair user_bufpair; - user_bufpair.inputs.splice(user_bufpair.inputs.end(), requests); - - contract_ctx.args.userbufs.try_emplace(pubkey, std::move(user_bufpair)); + iter->join(); + read_req_threads.erase(iter); } - - // Execute the contract. - return sc::execute_contract(contract_ctx); } } // namespace read_req \ No newline at end of file diff --git a/src/usr/read_req.hpp b/src/usr/read_req.hpp index ccaa5a99..669f0044 100644 --- a/src/usr/read_req.hpp +++ b/src/usr/read_req.hpp @@ -1,15 +1,29 @@ #ifndef _HP_CONS_READ_REQ_ #define _HP_CONS_READ_REQ_ +#include "../sc.hpp" + namespace read_req { + struct user_read_req + { + std::string pubkey; + std::string content; + }; + int init(); void deinit(); - + + void manage_thread_pool(); + void read_request_processor(); - int execute_contract(std::unordered_map> &read_requests); + int populate_read_req_queue(const std::string &pubkey, const std::string &content); + + void initialize_execution_context(const user_read_req &read_request, const pthread_t thread_id, sc::execution_context &contract_ctx); + + void remove_thread(const pthread_t id); } // namespace read_req diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 715f89c1..66492280 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -11,6 +11,7 @@ #include "usr.hpp" #include "user_session_handler.hpp" #include "user_input.hpp" +#include "read_req.hpp" namespace usr { @@ -143,10 +144,7 @@ namespace usr std::string content; if (parser.extract_read_request(content) == 0) { - std::lock_guard lock(ctx.users_mutex); - - //Add to the user's pending read requests list. - user.read_requests.push_back(std::move(content)); + read_req::populate_read_req_queue(user.pubkey, std::move(content)); return 0; } else