Changes to support follow commands

This commit is contained in:
chalith
2023-11-01 13:53:19 +05:30
parent 835b702e7e
commit f463cd6705
5 changed files with 36 additions and 40 deletions

View File

@@ -124,7 +124,11 @@ async function main() {
hpc.getStatus().then(stat => console.log(stat));
}
else if (inp.startsWith("hpsh ")) {
hpc.submitHpshRequest(inp.substr(5)).then(reply => console.log(reply));
hpc.submitHpshRequest(inp.substr(5)).then(id => {
hpc.on(id, (reply) => {
console.log(reply);
})
});
}
else {

View File

@@ -43,7 +43,7 @@ namespace hpsh
std::string fd_str;
fd_str.resize(10);
snprintf(fd_str.data(), sizeof(fd_str), "%d", ctx.control_fds[0]);
snprintf(fd_str.data(), 10, "%d", ctx.control_fds[0]);
char *argv[] = {(char *)conf::ctx.hpsh_exe_path.data(), fd_str.data(), NULL};
@@ -149,8 +149,20 @@ namespace hpsh
void remove_user_commands(std::string_view user_pubkey)
{
std::scoped_lock lock(ctx.command_mutex);
ctx.commands.remove_if([&](const command_context &command)
{ return command.user_pubkey == user_pubkey; });
auto itr = ctx.commands.begin();
while (itr != ctx.commands.end())
{
if (itr->user_pubkey == user_pubkey)
{
// Close the file descriptor and remove the command from context.
close(itr->child_fds[1]);
itr = ctx.commands.erase(itr);
}
else
{
itr++;
}
}
}
int execute(std::string_view id, std::string_view user_pubkey, std::string_view message)
@@ -202,7 +214,7 @@ namespace hpsh
return -1;
}
if (write(child_fds[1], message.data(), sizeof(message)) < 0)
if (write(child_fds[1], message.data(), message.size()) < 0)
{
LOG_ERROR << errno << ": Error writing to child fd.";
close(child_fds[0]);
@@ -214,7 +226,7 @@ namespace hpsh
{
std::scoped_lock lock(ctx.command_mutex);
ctx.commands.push_back(command_context{std::string(id), std::string(user_pubkey), {child_fds[0], child_fds[1]}, std::string(), false});
ctx.commands.push_back(command_context{std::string(id), std::string(user_pubkey), {child_fds[0], child_fds[1]}});
}
return 0;
@@ -247,29 +259,12 @@ namespace hpsh
}
else if (pfd.revents & POLLIN)
{
itr->response.resize(READ_BUFFER_SIZE);
const int res = read(pfd.fd, itr->response.data(), READ_BUFFER_SIZE);
std::string response;
response.resize(READ_BUFFER_SIZE);
const int res = read(pfd.fd, response.data(), READ_BUFFER_SIZE);
if (res > 0)
itr->response.resize(res); // Resize back to the actual bytes read.
else if (res == -1)
{
// Assuming that EPIPE or ECONNRESET resulted from contract termination, consider this as a neutral read.
if (errno == EPIPE || errno == ECONNRESET)
itr->read_completed = true;
else
LOG_ERROR << errno << ": Error reading from fd";
}
}
else
{
itr->read_completed = true;
}
// Send command back to user;
if (itr->read_completed)
{
{
response.resize(res);
std::scoped_lock<std::mutex> lock(usr::ctx.users_mutex);
// Find the user session by user pubkey.
@@ -279,22 +274,21 @@ namespace hpsh
const usr::connected_user &user = user_itr->second;
msg::usrmsg::usrmsg_parser parser(user.protocol);
std::vector<uint8_t> msg;
parser.create_hpsh_response_container(msg, itr->id, itr->response);
parser.create_hpsh_response_container(msg, itr->id, response);
user.session.send(msg);
response.clear();
}
}
else if (res == -1)
{
LOG_ERROR << errno << ": Error reading from fd";
}
}
// Close the file descriptor and remove the command from context.
close(itr->child_fds[1]);
itr = ctx.commands.erase(itr);
}
else
{
itr++;
}
itr++;
}
}
util::sleep(1000);
util::sleep(100);
}
}
}

View File

@@ -12,8 +12,6 @@ namespace hpsh
std::string id;
std::string user_pubkey;
int child_fds[2];
std::string response;
bool read_completed = false;
};
struct hpsh_context

View File

@@ -278,7 +278,7 @@ namespace usr
std::string id, content;
if (hpsh::ctx.is_initialized && parser.extract_hpsh_request(id, content) != -1)
{
if (hpsh::execute(id, user.pubkey, content.c_str()) == -1)
if (hpsh::execute(id, user.pubkey, content) == -1)
return -1;
return 0;

Binary file not shown.