nmux: client now uses poll; more messages are printed if NMUX_DEBUG is on

This commit is contained in:
ha7ilm 2017-01-12 16:24:59 +01:00
parent 47084804da
commit c2058aa34e
2 changed files with 40 additions and 38 deletions

View file

@ -96,7 +96,7 @@ int main(int argc, char* argv[])
case '?': case '?':
case ':': case ':':
default: default:
print_exit(MSG_START "(main thread) error in getopt_long()\n"); print_exit(MSG_START "error in getopt_long()\n");
} }
} }
@ -123,7 +123,7 @@ int main(int argc, char* argv[])
int sockopt = 1; int sockopt = 1;
if( setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&sockopt, sizeof(sockopt)) == -1 ) if( setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&sockopt, sizeof(sockopt)) == -1 )
error_exit(MSG_START "(main thread) cannot set SO_REUSEADDR"); //the best description on SO_REUSEADDR ever: http://stackoverflow.com/a/14388707/3182453 error_exit(MSG_START "cannot set SO_REUSEADDR"); //the best description on SO_REUSEADDR ever: http://stackoverflow.com/a/14388707/3182453
memset(&addr_host,'0',sizeof(addr_host)); memset(&addr_host,'0',sizeof(addr_host));
addr_host.sin_family = AF_INET; addr_host.sin_family = AF_INET;
@ -131,15 +131,15 @@ int main(int argc, char* argv[])
addr_host.sin_addr.s_addr = INADDR_ANY; addr_host.sin_addr.s_addr = INADDR_ANY;
if( (addr_host.sin_addr.s_addr=inet_addr(host_address)) == INADDR_NONE ) if( (addr_host.sin_addr.s_addr=inet_addr(host_address)) == INADDR_NONE )
error_exit(MSG_START "(main thread) invalid host address"); error_exit(MSG_START "invalid host address");
if( bind(listen_socket, (struct sockaddr*) &addr_host, sizeof(addr_host)) < 0 ) if( bind(listen_socket, (struct sockaddr*) &addr_host, sizeof(addr_host)) < 0 )
error_exit(MSG_START "(main thread) cannot bind() address to the socket"); error_exit(MSG_START "cannot bind() address to the socket");
if( listen(listen_socket, 10) == -1 ) if( listen(listen_socket, 10) == -1 )
error_exit(MSG_START "(main thread) cannot listen() on socket"); error_exit(MSG_START "cannot listen() on socket");
fprintf(stderr,MSG_START "(main thread) listening on %s:%d\n", inet_ntoa(addr_host.sin_addr), host_port); fprintf(stderr, MSG_START "listening on %s:%d\n", inet_ntoa(addr_host.sin_addr), host_port);
struct sockaddr_in addr_cli; struct sockaddr_in addr_cli;
socklen_t addr_cli_len = sizeof(addr_cli); socklen_t addr_cli_len = sizeof(addr_cli);
@ -156,11 +156,11 @@ int main(int argc, char* argv[])
//Set stdin and listen_socket to non-blocking //Set stdin and listen_socket to non-blocking
if(set_nonblocking(input_fd) || set_nonblocking(listen_socket)) if(set_nonblocking(input_fd) || set_nonblocking(listen_socket))
error_exit(MSG_START "(main thread) cannot set_nonblocking()"); error_exit(MSG_START "cannot set_nonblocking()");
//Create tsmpool //Create tsmpool
pool = new tsmpool(bufsize, bufcnt); pool = new tsmpool(bufsize, bufcnt);
if(!pool->is_ok()) print_exit(MSG_START "(main thread) tsmpool failed to initialize\n"); if(!pool->is_ok()) print_exit(MSG_START "tsmpool failed to initialize\n");
unsigned char* current_write_buffer = (unsigned char*)pool->get_write_buffer(); unsigned char* current_write_buffer = (unsigned char*)pool->get_write_buffer();
int index_in_current_write_buffer = 0; int index_in_current_write_buffer = 0;
@ -170,25 +170,25 @@ int main(int argc, char* argv[])
// data arrives. // data arrives.
pthread_cond_t* wait_condition = new pthread_cond_t; pthread_cond_t* wait_condition = new pthread_cond_t;
if(pthread_cond_init(wait_condition, NULL)) if(pthread_cond_init(wait_condition, NULL))
print_exit(MSG_START "(main thread) pthread_cond_init failed"); //cond_attrs is ignored by Linux print_exit(MSG_START "pthread_cond_init failed"); //cond_attrs is ignored by Linux
for(;;) for(;;)
{ {
fprintf(stderr, "mainfor: selecting..."); if(NMUX_DEBUG) fprintf(stderr, "mainfor: selecting...");
//Let's wait until there is any new data to read, or any new connection! //Let's wait until there is any new data to read, or any new connection!
select(highfd, &select_fds, NULL, NULL, NULL); select(highfd, &select_fds, NULL, NULL, NULL);
fprintf(stderr, "selected.\n"); if(NMUX_DEBUG) fprintf(stderr, "selected.\n");
//Is there a new client connection? //Is there a new client connection?
if( (new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) != -1) if( (new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) != -1)
{ {
fprintf(stderr, "mainfor: accepted.\n"); if(NMUX_DEBUG) fprintf(stderr, "mainfor: accepted (%d).\n", new_socket);
//Close all finished clients //Close all finished clients
for(int i=0;i<clients.size();i++) for(int i=0;i<clients.size();i++)
{ {
if(clients[i]->status == CS_THREAD_FINISHED) if(clients[i]->status == CS_THREAD_FINISHED)
{ {
fprintf(stderr, "mainfor: client removed: %d\n", i); if(NMUX_DEBUG) fprintf(stderr, "mainfor: client removed: %d\n", i);
client_erase(clients[i]); client_erase(clients[i]);
clients.erase(clients.begin()+i); clients.erase(clients.begin()+i);
} }
@ -205,14 +205,14 @@ int main(int argc, char* argv[])
pthread_mutex_init(&new_client->wait_mutex, NULL); pthread_mutex_init(&new_client->wait_mutex, NULL);
new_client->wait_condition = wait_condition; new_client->wait_condition = wait_condition;
new_client->sleeping = 0; new_client->sleeping = 0;
if(pthread_create(&new_client->thread, NULL, client_thread , (void*)&new_client)==0) if(pthread_create(&new_client->thread, NULL, client_thread, (void*)&new_client)==0)
{ {
clients.push_back(new_client); clients.push_back(new_client);
fprintf(stderr, MSG_START "(main thread) pthread_create() done, clients now: %d\n", clients.size()); fprintf(stderr, MSG_START "pthread_create() done, clients now: %d\n", clients.size());
} }
else else
{ {
fprintf(stderr, MSG_START "(main thread) pthread_create() failed.\n"); fprintf(stderr, MSG_START "pthread_create() failed.\n");
pool->remove_thread(new_client->tsmthread); pool->remove_thread(new_client->tsmthread);
pthread_mutex_destroy(&new_client->wait_mutex); pthread_mutex_destroy(&new_client->wait_mutex);
delete new_client; delete new_client;
@ -230,7 +230,7 @@ int main(int argc, char* argv[])
index_in_current_write_buffer = 0; index_in_current_write_buffer = 0;
} }
int retval = read(input_fd, current_write_buffer + index_in_current_write_buffer, bufsize - index_in_current_write_buffer); int retval = read(input_fd, current_write_buffer + index_in_current_write_buffer, bufsize - index_in_current_write_buffer);
fprintf(stderr, "mainfor: read %d\n", retval); if(NMUX_DEBUG) fprintf(stderr, "mainfor: read %d\n", retval);
if(retval>0) if(retval>0)
{ {
index_in_current_write_buffer += retval; index_in_current_write_buffer += retval;
@ -261,23 +261,21 @@ void* client_thread (void* param)
this_client->status = CS_THREAD_RUNNING; this_client->status = CS_THREAD_RUNNING;
int retval; int retval;
tsmpool* lpool = this_client->lpool; tsmpool* lpool = this_client->lpool;
if(NMUX_DEBUG) fprintf(stderr, "client 0x%x: socket = %d!\n", (unsigned)param, this_client->socket);
fprintf(stderr, "client 0x%x: make maxfd... ", (unsigned)param); if(NMUX_DEBUG) fprintf(stderr, "client 0x%x: poll init...", (unsigned)param);
fd_set client_select_fds; struct pollfd pollfds[1];
fprintf(stderr, "1"); pollfds[0].fd = this_client->socket;
int client_highfd = 0; pollfds[0].events = POLLOUT;
FD_ZERO(&client_select_fds); pollfds[0].revents = 0;
fprintf(stderr, "2 %d <= %d ", this_client->socket, FD_SETSIZE); if(NMUX_DEBUG) fprintf(stderr, "done.\n");
FD_SET(this_client->socket, &client_select_fds);
fprintf(stderr, "3");
maxfd(&client_highfd, this_client->socket);
fprintf(stderr, "done.\n");
//Set client->socket to non-blocking //Set this_client->socket to non-blocking
if(set_nonblocking(this_client->socket)) if(set_nonblocking(this_client->socket))
error_exit(MSG_START "cannot set_nonblocking() on client->socket"); error_exit(MSG_START "cannot set_nonblocking() on this_client->socket");
int client_buffer_index = 0; int client_buffer_index = 0;
int client_goto_source = 0;
char* pool_read_buffer = NULL; char* pool_read_buffer = NULL;
for(;;) for(;;)
@ -287,7 +285,7 @@ void* client_thread (void* param)
// (Wait for the server process to wake me up.) // (Wait for the server process to wake me up.)
while(!pool_read_buffer || client_buffer_index >= lpool->size) while(!pool_read_buffer || client_buffer_index >= lpool->size)
{ {
fprintf(stderr, "client 0x%x: waiting\n", (unsigned)param); if(NMUX_DEBUG) fprintf(stderr, "client 0x%x: cond_waiting for more data\n", (unsigned)param);
char* pool_read_buffer = (char*)lpool->get_read_buffer(this_client->tsmthread); char* pool_read_buffer = (char*)lpool->get_read_buffer(this_client->tsmthread);
pthread_mutex_lock(&this_client->wait_mutex); pthread_mutex_lock(&this_client->wait_mutex);
this_client->sleeping = 1; this_client->sleeping = 1;
@ -295,20 +293,22 @@ void* client_thread (void* param)
} }
//Wait for the socket to be available for write. //Wait for the socket to be available for write.
fprintf(stderr, "client 0x%x: selecting...", (unsigned)param); if(NMUX_DEBUG) fprintf(stderr, "client 0x%x: polling for socket write...", (unsigned)param);
select(client_highfd, NULL, &client_select_fds, NULL, NULL); int ret = poll(pollfds, 1, -1);
fprintf(stderr, "done.\n"); if(NMUX_DEBUG) fprintf(stderr, "done.\n");
if(ret == 0) continue;
else if (ret == -1) { client_goto_source = 1; goto client_thread_exit; }
//Read data from global tsmpool and write it to client socket //Read data from global tsmpool and write it to client socket
fprintf(stderr, "client 0x%x: sending...", (unsigned)param); if(NMUX_DEBUG) fprintf(stderr, "client 0x%x: sending...", (unsigned)param);
int ret = send(this_client->socket, pool_read_buffer + client_buffer_index, lpool->size - client_buffer_index, 0); ret = send(this_client->socket, pool_read_buffer + client_buffer_index, lpool->size - client_buffer_index, 0);
fprintf(stderr, "done.\n"); if(NMUX_DEBUG) fprintf(stderr, "done.\n");
if(ret == -1) if(ret == -1)
{ {
switch(errno) switch(errno)
{ {
case EAGAIN: break; case EAGAIN: break;
default: goto client_thread_exit; default: client_goto_source = 2; goto client_thread_exit;
} }
} }
else client_buffer_index += ret; else client_buffer_index += ret;
@ -316,7 +316,7 @@ void* client_thread (void* param)
client_thread_exit: client_thread_exit:
this_client->status = CS_THREAD_FINISHED; this_client->status = CS_THREAD_FINISHED;
fprintf(stderr, "CS_THREAD_FINISHED"); //Debug fprintf(stderr, "client 0x%x: CS_THREAD_FINISHED, client_goto_source = %d, errno = %d", (unsigned)param, client_goto_source, errno);
pthread_exit(NULL); pthread_exit(NULL);
return NULL; return NULL;
} }

2
nmux.h
View file

@ -7,12 +7,14 @@
#include <pthread.h> #include <pthread.h>
#include <errno.h> #include <errno.h>
#include <fcntl.h> #include <fcntl.h>
#include <poll.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netinet/in.h> #include <netinet/in.h>
#include "tsmpool.h" #include "tsmpool.h"
#define MSG_START "nmux: " #define MSG_START "nmux: "
#define NMUX_DEBUG 1
typedef enum client_status_e typedef enum client_status_e
{ {