hpws upgrade with websocket protocol improvements. (#232)

* Updated hpws binary and header.
* Improved binary encoding support in client lib.
This commit is contained in:
Ravin Perera
2021-02-01 22:31:28 +05:30
committed by GitHub
parent d34b7bce53
commit 08680ee8d4
5 changed files with 129 additions and 946 deletions

View File

@@ -47,7 +47,7 @@ namespace hpws
// used when waiting for messages that should already be on the pipe
#define HPWS_SMALL_TIMEOUT 10
// used when waiting for server process to spawn
#define HPWS_LONG_TIMEOUT 2500
#define HPWS_LONG_TIMEOUT 50
typedef union
{
@@ -138,11 +138,11 @@ namespace hpws
for (int i = 0; i < 4; ++i)
{
munmap(buffer[i], max_buffer_size);
close(buffer_fd[i]);
::close(buffer_fd[i]);
}
close(control_line_fd[0]);
close(control_line_fd[1]);
::close(control_line_fd[0]);
::close(control_line_fd[1]);
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] child destructed pid = %d\n", child_pid);
@@ -215,6 +215,21 @@ namespace hpws
}
}
void close()
{
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] close called\n");
// send the control message informing hpws that we wish to close
char buf[1] = {'c'};
::write(control_line_fd[1], buf, 1);
// wait for the process to end gracefully
int status;
printf("waitpid result: %d\n", waitpid(child_pid, &status, 0)); // add timeout here?
}
std::optional<error> write(std::string_view to_write)
{
if (HPWS_DEBUG)
@@ -371,8 +386,8 @@ namespace hpws
// --- PARENT
close(fd[1]);
close(fd[3]);
::close(fd[1]);
::close(fd[3]);
int child_fd[2] = {fd[0], fd[2]};
@@ -481,8 +496,8 @@ namespace hpws
if (fork_child_init)
fork_child_init();
close(fd[0]);
close(fd[2]);
::close(fd[0]);
::close(fd[2]);
// dup fd[1] into fd 3
/*if (dup2(fd[1], 3) == -1)
@@ -490,8 +505,8 @@ namespace hpws
if (dup2(fd[3], 4) == -1)
perror("dup2 fd[3]");
*/
// close(fd[1]);
// close(fd[3]);
// ::close(fd[1]);
// ::close(fd[3]);
// we're assuming all fds above 3 will have close_exec flag
execv(bin_path.data(), (char *const *)argv_pass);
@@ -515,11 +530,11 @@ namespace hpws
for (int i = 0; i < 4; ++i)
{
if (fd[i] > 0)
close(fd[i]);
::close(fd[i]);
if (mapping[i] != MAP_FAILED && mapping[i] != NULL)
munmap(mapping[i], max_buffer_size);
if (buffer_fd[i] > -1)
close(buffer_fd[i]);
::close(buffer_fd[i]);
}
return error{error_code, std::string{error_msg}};
@@ -547,9 +562,9 @@ namespace hpws
if (mapping[i] != MAP_FAILED && mapping[i] != NULL)
munmap(mapping[i], max_buffer_size_);
if (i < 2 && child_fd[i] > -1)
close(child_fd[i]);
::close(child_fd[i]);
if (buffer_fd[i] > -1)
close(buffer_fd[i]);
::close(buffer_fd[i]);
}
if (pid_child > 0)
@@ -589,6 +604,13 @@ namespace hpws
std::variant<client, error> accept(const bool no_block = false)
{
static int calls = 0;
++calls;
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] Accept[0] called %d\n", calls);
#define HPWS_ACCEPT_ERROR(code, msg) \
{ \
accept_cleanup(mapping, child_fd, buffer_fd, pid); \
@@ -601,6 +623,8 @@ namespace hpws
// must not use pid_t here since we transfer across IPC channel as a uint32.
uint32_t pid = 0;
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] Accept[1] called %d\n", calls);
{
struct msghdr child_msg = {0};
memset(&child_msg, 0, sizeof(child_msg));
@@ -608,13 +632,16 @@ namespace hpws
child_msg.msg_control = cmsgbuf;
child_msg.msg_controllen = sizeof(cmsgbuf);
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] Accept[2] called %d\n", calls);
// If no-block is specified, we first check any bytes available on control fd
// before attempting to do a blocking a read.
if (no_block)
{
struct pollfd master_pfd;
master_pfd.fd = this->master_control_fd_;
master_pfd.events = POLLIN;
master_pfd.events = POLLERR | POLLHUP | POLLNVAL | POLLIN;
const int master_poll_result = poll(&master_pfd, 1, HPWS_SMALL_TIMEOUT);
if (master_poll_result == -1) // 1 ms timeout
@@ -624,6 +651,9 @@ namespace hpws
HPWS_ACCEPT_ERROR(199, "no new client available");
}
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] Accept[3] called %d\n", calls);
int bytes_read =
recvmsg(this->master_control_fd_, &child_msg, 0);
struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg);
@@ -635,16 +665,25 @@ namespace hpws
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] On accept received SCM: child_fd[0] = %d, child_fd[1] = %d\n",
child_fd[0], child_fd[1]);
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] Accept[4] called %d\n", calls);
}
// read info from child control line with a timeout
struct pollfd pfd;
int ret;
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] Accept[5] called %d\n", calls);
pfd.fd = child_fd[0]; // expect all setup messages on the hpws->hpcore controlfd (0)
pfd.events = POLLIN;
ret = poll(&pfd, 1, HPWS_SMALL_TIMEOUT); // 1 ms timeout
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] Accept[6] called %d\n", calls);
// timeout or error
if (ret < 1)
HPWS_ACCEPT_ERROR(202, "timeout waiting for hpws accept child message");
@@ -653,11 +692,17 @@ namespace hpws
if (recv(child_fd[0], (unsigned char *)(&pid), sizeof(pid), 0) < sizeof(pid))
HPWS_ACCEPT_ERROR(212, "did not receive expected 4 byte pid of child process on accept");
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] Accept[7] called %d\n", calls);
// second thing we'll receive is IP address structure of the client
addr_t buf;
int bytes_read =
recv(child_fd[0], (unsigned char *)(&buf), sizeof(buf), 0);
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] Accept[8] called %d\n", calls);
if (bytes_read < sizeof(buf))
HPWS_ACCEPT_ERROR(202, "received message on master control line was not sizeof(sockaddr_in6)");
@@ -669,12 +714,19 @@ namespace hpws
child_msg.msg_control = cmsgbuf;
child_msg.msg_controllen = sizeof(cmsgbuf);
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] Accept[9] called %d\n", calls);
int bytes_read =
recvmsg(child_fd[0], &child_msg, 0);
struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg);
if (cmsg == NULL || cmsg->cmsg_type != SCM_RIGHTS)
HPWS_ACCEPT_ERROR(203, "non-scm_rights message sent on accept child control line");
memcpy(&buffer_fd, CMSG_DATA(cmsg), sizeof(buffer_fd));
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] Accept[10] called %d\n", calls);
for (int i = 0; i < 4; ++i)
{
//fprintf(stderr, "scm passed buffer_fd[%d] = %d\n", i, buffer_fd[i]);
@@ -685,6 +737,8 @@ namespace hpws
if (mapping[i] == MAP_FAILED)
HPWS_ACCEPT_ERROR(204, "could not mmap scm_rights passed buffer fd");
}
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] Accept[11] called %d\n", calls);
}
{
struct pollfd pfd;
@@ -695,10 +749,19 @@ namespace hpws
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] waiting for 'r' on child_fd[%d]=%d accept\n", i, child_fd[i]);
pfd.fd = child_fd[i];
pfd.events = POLLIN;
pfd.events = POLLERR | POLLHUP | POLLNVAL | POLLIN;
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] Accept[12] called %d\n", calls);
// now we wait for a 'r' ready message or for the socket/client to die
ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout
if (!(pfd.revents & POLLIN))
HPWS_ACCEPT_ERROR(5, "could not read from client_fd");
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] Accept[12a] called %d - ret %d\n", calls, ret);
char rbuf[2];
bytes_read = recv(child_fd[i], rbuf, sizeof(rbuf), 0);
if (bytes_read < 1)
@@ -713,6 +776,9 @@ namespace hpws
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] 'r%c' received on child_fd[%d]=%d\n", rbuf[1], i, child_fd[i]);
}
if (HPWS_DEBUG)
fprintf(stderr, "[HPWS.HPP] Accept[13] called %d\n", calls);
}
return client{
@@ -739,7 +805,7 @@ namespace hpws
waitpid(server_pid_, &status, 0 /* should we use WNOHANG? */);
}
close(master_control_fd_);
::close(master_control_fd_);
}
}
@@ -820,7 +886,7 @@ namespace hpws
// --- PARENT
close(fd[1]);
::close(fd[1]);
int flags = fcntl(fd[0], F_GETFD, NULL);
if (flags < 0)
@@ -886,11 +952,11 @@ namespace hpws
if (fork_child_init)
fork_child_init();
close(fd[0]);
::close(fd[0]);
// dup fd[1] into fd 3
dup2(fd[1], 3);
close(fd[1]);
::close(fd[1]);
// we're assuming all fds above 3 will have close_exec flag
execv(bin_path.data(), (char *const *)argv_pass);
@@ -912,9 +978,9 @@ namespace hpws
waitpid(pid, &status, 0 /* should we use WNOHANG? */);
}
if (fd[0] > 0)
close(fd[0]);
::close(fd[0]);
if (fd[1] > 0)
close(fd[1]);
::close(fd[1]);
return error{error_code, std::string{error_msg}};
}