nmux: implemented select properly at the end

This commit is contained in:
ha7ilm 2017-01-12 23:52:53 +01:00
parent 2f633f1f29
commit ac392f4ead

View file

@ -149,16 +149,13 @@ int main(int argc, char* argv[])
int new_socket; int new_socket;
int highfd = 0; int highfd = 0;
int input_fd = STDIN_FILENO;
fd_set select_fds;
FD_ZERO(&select_fds);
FD_SET(listen_socket, &select_fds);
maxfd(&highfd, listen_socket); maxfd(&highfd, listen_socket);
FD_SET(input_fd, &select_fds); maxfd(&highfd, STDIN_FILENO);
maxfd(&highfd, input_fd);
fd_set select_fds;
//Set stdin and listen_socket to non-blocking //Set stdin and listen_socket to non-blocking
if(set_nonblocking(input_fd) || set_nonblocking(listen_socket)) if(set_nonblocking(STDIN_FILENO) || set_nonblocking(listen_socket))
error_exit(MSG_START "cannot set_nonblocking()"); error_exit(MSG_START "cannot set_nonblocking()");
//Create tsmpool //Create tsmpool
@ -179,17 +176,20 @@ int main(int argc, char* argv[])
for(;;) for(;;)
{ {
FD_ZERO(&select_fds);
FD_SET(listen_socket, &select_fds);
FD_SET(STDIN_FILENO, &select_fds);
if(NMUX_DEBUG) fprintf(stderr, "mainfor: selecting..."); if(NMUX_DEBUG) fprintf(stderr, "mainfor: selecting...");
//Let's wait until there is any new data to read, or any new connection! //Let's wait until there is any new data to read, or any new connection!
int select_num = select(highfd, &select_fds, NULL, NULL, NULL); int select_ret = select(highfd, &select_fds, NULL, NULL, NULL);
if(NMUX_DEBUG) fprintf(stderr, "selected.\n"); if(NMUX_DEBUG) fprintf(stderr, "selected.\n");
if(select_ret == -1) error_exit("mainfor select() error");
if(select_num == -1) error_exit("mainfor select() error");
//Is there a new client connection? //Is there a new client connection?
if( (new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) != -1) if( FD_ISSET(listen_socket, &select_fds) && ((new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) != -1) )
{ {
select_num--; select_ret--;
if(NMUX_DEBUG) fprintf(stderr, "mainfor: accepted (socket = %d).\n", new_socket); if(NMUX_DEBUG) fprintf(stderr, "mainfor: accepted (socket = %d).\n", new_socket);
//Close all finished clients //Close all finished clients
for(int i=0;i<clients.size();i++) for(int i=0;i<clients.size();i++)
@ -224,38 +224,37 @@ int main(int argc, char* argv[])
} }
} }
if(!select_num) continue; if( FD_ISSET(STDIN_FILENO, &select_fds) )
if(index_in_current_write_buffer >= bufsize)
{ {
if(NMUX_DEBUG) fprintf(stderr, "mainfor: gwbing..."); if(index_in_current_write_buffer >= bufsize)
current_write_buffer = (unsigned char*)pool->get_write_buffer();
if(NMUX_DEBUG) fprintf(stderr, "gwbed.\nmainfor: cond broadcasting...");
pthread_cond_broadcast(&wait_condition);
if(NMUX_DEBUG) fprintf(stderr, "cond broadcasted.\n");
//Shouldn't we do it after we put data in?
// No, on get_write_buffer() actually the previous buffer is getting available
// for read for threads that wait for new data (wait on global pthead mutex
// wait_condition).
index_in_current_write_buffer = 0;
}
for(;;)
{
if(NMUX_DEBUG) fprintf(stderr, "mainfor: reading...\n");
int retval = read(input_fd, current_write_buffer + index_in_current_write_buffer, bufsize - index_in_current_write_buffer);
if(NMUX_DEBUG) fprintf(stderr, "read %d\n", retval);
if(retval>0)
{ {
index_in_current_write_buffer += retval; if(NMUX_DEBUG) fprintf(stderr, "mainfor: gwbing...");
break; current_write_buffer = (unsigned char*)pool->get_write_buffer();
if(NMUX_DEBUG) fprintf(stderr, "gwbed.\nmainfor: cond broadcasting...");
pthread_cond_broadcast(&wait_condition);
if(NMUX_DEBUG) fprintf(stderr, "cond broadcasted.\n");
//Shouldn't we do it after we put data in?
// No, on get_write_buffer() actually the previous buffer is getting available
// for read for threads that wait for new data (wait on global pthead mutex
// wait_condition).
index_in_current_write_buffer = 0;
} }
else if(retval==0)
if(NMUX_DEBUG) fprintf(stderr, "mainfor: reading...\n");
int read_ret = read(STDIN_FILENO, current_write_buffer + index_in_current_write_buffer, bufsize - index_in_current_write_buffer);
if(NMUX_DEBUG) fprintf(stderr, "read %d\n", read_ret);
if(read_ret>0)
{
index_in_current_write_buffer += read_ret;
}
else if(read_ret==0)
{ {
//End of input stream, close clients and exit //End of input stream, close clients and exit
print_exit(MSG_START "(main thread/for) end input stream, exiting.\n"); print_exit(MSG_START "(main thread/for) end input stream, exiting.\n");
} }
else if(retval==-1) else if(read_ret==-1)
{ {
if(errno == EAGAIN) { if(NMUX_DEBUG) fprintf(stderr, "mainfor: read EAGAIN\n"); /* seems like select would block forever, so we just read again */ break; } if(errno == EAGAIN) { if(NMUX_DEBUG) fprintf(stderr, "mainfor: read EAGAIN\n"); /* seems like select would block forever, so we just read again */ }
else error_exit(MSG_START "(main thread/for) error in read(), exiting.\n"); else error_exit(MSG_START "(main thread/for) error in read(), exiting.\n");
} }
} }