diff --git a/nmux.cpp b/nmux.cpp index 56f650b..e2e200e 100644 --- a/nmux.cpp +++ b/nmux.cpp @@ -96,7 +96,7 @@ int main(int argc, char* argv[]) case '?': case ':': default: - print_exit(MSG_START "(main thread) error in getopt_long()\n"); + print_exit(MSG_START "error in getopt_long()\n"); } } @@ -123,7 +123,7 @@ int main(int argc, char* argv[]) int sockopt = 1; if( setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&sockopt, sizeof(sockopt)) == -1 ) - error_exit(MSG_START "(main thread) cannot set SO_REUSEADDR"); //the best description on SO_REUSEADDR ever: http://stackoverflow.com/a/14388707/3182453 + error_exit(MSG_START "cannot set SO_REUSEADDR"); //the best description on SO_REUSEADDR ever: http://stackoverflow.com/a/14388707/3182453 memset(&addr_host,'0',sizeof(addr_host)); addr_host.sin_family = AF_INET; @@ -131,15 +131,15 @@ int main(int argc, char* argv[]) addr_host.sin_addr.s_addr = INADDR_ANY; if( (addr_host.sin_addr.s_addr=inet_addr(host_address)) == INADDR_NONE ) - error_exit(MSG_START "(main thread) invalid host address"); + error_exit(MSG_START "invalid host address"); if( bind(listen_socket, (struct sockaddr*) &addr_host, sizeof(addr_host)) < 0 ) - error_exit(MSG_START "(main thread) cannot bind() address to the socket"); + error_exit(MSG_START "cannot bind() address to the socket"); if( listen(listen_socket, 10) == -1 ) - error_exit(MSG_START "(main thread) cannot listen() on socket"); + error_exit(MSG_START "cannot listen() on socket"); - fprintf(stderr,MSG_START "(main thread) listening on %s:%d\n", inet_ntoa(addr_host.sin_addr), host_port); + fprintf(stderr, MSG_START "listening on %s:%d\n", inet_ntoa(addr_host.sin_addr), host_port); struct sockaddr_in addr_cli; socklen_t addr_cli_len = sizeof(addr_cli); @@ -156,11 +156,11 @@ int main(int argc, char* argv[]) //Set stdin and listen_socket to non-blocking if(set_nonblocking(input_fd) || set_nonblocking(listen_socket)) - error_exit(MSG_START "(main thread) cannot set_nonblocking()"); + error_exit(MSG_START "cannot set_nonblocking()"); //Create tsmpool pool = new tsmpool(bufsize, bufcnt); - if(!pool->is_ok()) print_exit(MSG_START "(main thread) tsmpool failed to initialize\n"); + if(!pool->is_ok()) print_exit(MSG_START "tsmpool failed to initialize\n"); unsigned char* current_write_buffer = (unsigned char*)pool->get_write_buffer(); int index_in_current_write_buffer = 0; @@ -170,25 +170,25 @@ int main(int argc, char* argv[]) // data arrives. pthread_cond_t* wait_condition = new pthread_cond_t; if(pthread_cond_init(wait_condition, NULL)) - print_exit(MSG_START "(main thread) pthread_cond_init failed"); //cond_attrs is ignored by Linux + print_exit(MSG_START "pthread_cond_init failed"); //cond_attrs is ignored by Linux for(;;) { - fprintf(stderr, "mainfor: selecting..."); + if(NMUX_DEBUG) fprintf(stderr, "mainfor: selecting..."); //Let's wait until there is any new data to read, or any new connection! select(highfd, &select_fds, NULL, NULL, NULL); - fprintf(stderr, "selected.\n"); + if(NMUX_DEBUG) fprintf(stderr, "selected.\n"); //Is there a new client connection? if( (new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) != -1) { - fprintf(stderr, "mainfor: accepted.\n"); + if(NMUX_DEBUG) fprintf(stderr, "mainfor: accepted (%d).\n", new_socket); //Close all finished clients for(int i=0;istatus == CS_THREAD_FINISHED) { - fprintf(stderr, "mainfor: client removed: %d\n", i); + if(NMUX_DEBUG) fprintf(stderr, "mainfor: client removed: %d\n", i); client_erase(clients[i]); clients.erase(clients.begin()+i); } @@ -205,14 +205,14 @@ int main(int argc, char* argv[]) pthread_mutex_init(&new_client->wait_mutex, NULL); new_client->wait_condition = wait_condition; new_client->sleeping = 0; - if(pthread_create(&new_client->thread, NULL, client_thread , (void*)&new_client)==0) + if(pthread_create(&new_client->thread, NULL, client_thread, (void*)&new_client)==0) { clients.push_back(new_client); - fprintf(stderr, MSG_START "(main thread) pthread_create() done, clients now: %d\n", clients.size()); + fprintf(stderr, MSG_START "pthread_create() done, clients now: %d\n", clients.size()); } else { - fprintf(stderr, MSG_START "(main thread) pthread_create() failed.\n"); + fprintf(stderr, MSG_START "pthread_create() failed.\n"); pool->remove_thread(new_client->tsmthread); pthread_mutex_destroy(&new_client->wait_mutex); delete new_client; @@ -230,7 +230,7 @@ int main(int argc, char* argv[]) index_in_current_write_buffer = 0; } int retval = read(input_fd, current_write_buffer + index_in_current_write_buffer, bufsize - index_in_current_write_buffer); - fprintf(stderr, "mainfor: read %d\n", retval); + if(NMUX_DEBUG) fprintf(stderr, "mainfor: read %d\n", retval); if(retval>0) { index_in_current_write_buffer += retval; @@ -261,23 +261,21 @@ void* client_thread (void* param) this_client->status = CS_THREAD_RUNNING; int retval; tsmpool* lpool = this_client->lpool; + if(NMUX_DEBUG) fprintf(stderr, "client 0x%x: socket = %d!\n", (unsigned)param, this_client->socket); - fprintf(stderr, "client 0x%x: make maxfd... ", (unsigned)param); - fd_set client_select_fds; - fprintf(stderr, "1"); - int client_highfd = 0; - FD_ZERO(&client_select_fds); - fprintf(stderr, "2 %d <= %d ", this_client->socket, FD_SETSIZE); - FD_SET(this_client->socket, &client_select_fds); - fprintf(stderr, "3"); - maxfd(&client_highfd, this_client->socket); - fprintf(stderr, "done.\n"); + if(NMUX_DEBUG) fprintf(stderr, "client 0x%x: poll init...", (unsigned)param); + struct pollfd pollfds[1]; + pollfds[0].fd = this_client->socket; + pollfds[0].events = POLLOUT; + pollfds[0].revents = 0; + if(NMUX_DEBUG) fprintf(stderr, "done.\n"); - //Set client->socket to non-blocking + //Set this_client->socket to non-blocking if(set_nonblocking(this_client->socket)) - error_exit(MSG_START "cannot set_nonblocking() on client->socket"); + error_exit(MSG_START "cannot set_nonblocking() on this_client->socket"); int client_buffer_index = 0; + int client_goto_source = 0; char* pool_read_buffer = NULL; for(;;) @@ -287,7 +285,7 @@ void* client_thread (void* param) // (Wait for the server process to wake me up.) while(!pool_read_buffer || client_buffer_index >= lpool->size) { - fprintf(stderr, "client 0x%x: waiting\n", (unsigned)param); + if(NMUX_DEBUG) fprintf(stderr, "client 0x%x: cond_waiting for more data\n", (unsigned)param); char* pool_read_buffer = (char*)lpool->get_read_buffer(this_client->tsmthread); pthread_mutex_lock(&this_client->wait_mutex); this_client->sleeping = 1; @@ -295,20 +293,22 @@ void* client_thread (void* param) } //Wait for the socket to be available for write. - fprintf(stderr, "client 0x%x: selecting...", (unsigned)param); - select(client_highfd, NULL, &client_select_fds, NULL, NULL); - fprintf(stderr, "done.\n"); + if(NMUX_DEBUG) fprintf(stderr, "client 0x%x: polling for socket write...", (unsigned)param); + int ret = poll(pollfds, 1, -1); + if(NMUX_DEBUG) fprintf(stderr, "done.\n"); + if(ret == 0) continue; + else if (ret == -1) { client_goto_source = 1; goto client_thread_exit; } //Read data from global tsmpool and write it to client socket - fprintf(stderr, "client 0x%x: sending...", (unsigned)param); - int ret = send(this_client->socket, pool_read_buffer + client_buffer_index, lpool->size - client_buffer_index, 0); - fprintf(stderr, "done.\n"); + if(NMUX_DEBUG) fprintf(stderr, "client 0x%x: sending...", (unsigned)param); + ret = send(this_client->socket, pool_read_buffer + client_buffer_index, lpool->size - client_buffer_index, 0); + if(NMUX_DEBUG) fprintf(stderr, "done.\n"); if(ret == -1) { switch(errno) { case EAGAIN: break; - default: goto client_thread_exit; + default: client_goto_source = 2; goto client_thread_exit; } } else client_buffer_index += ret; @@ -316,7 +316,7 @@ void* client_thread (void* param) client_thread_exit: this_client->status = CS_THREAD_FINISHED; - fprintf(stderr, "CS_THREAD_FINISHED"); //Debug + fprintf(stderr, "client 0x%x: CS_THREAD_FINISHED, client_goto_source = %d, errno = %d", (unsigned)param, client_goto_source, errno); pthread_exit(NULL); return NULL; } diff --git a/nmux.h b/nmux.h index 8efaf5a..c8d875c 100644 --- a/nmux.h +++ b/nmux.h @@ -7,12 +7,14 @@ #include #include #include +#include #include #include #include #include "tsmpool.h" #define MSG_START "nmux: " +#define NMUX_DEBUG 1 typedef enum client_status_e {