nmux: things start to work

This commit is contained in:
ha7ilm 2017-01-12 16:56:54 +01:00
parent c2058aa34e
commit 1177e03660
3 changed files with 18 additions and 8 deletions

View file

@ -174,15 +174,15 @@ int main(int argc, char* argv[])
for(;;) for(;;)
{ {
if(NMUX_DEBUG) fprintf(stderr, "mainfor: selecting..."); // if(NMUX_DEBUG) fprintf(stderr, "mainfor: selecting...");
//Let's wait until there is any new data to read, or any new connection! //Let's wait until there is any new data to read, or any new connection!
select(highfd, &select_fds, NULL, NULL, NULL); select(highfd, &select_fds, NULL, NULL, NULL);
if(NMUX_DEBUG) fprintf(stderr, "selected.\n"); // if(NMUX_DEBUG) fprintf(stderr, "selected.\n");
//Is there a new client connection? //Is there a new client connection?
if( (new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) != -1) if( (new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) != -1)
{ {
if(NMUX_DEBUG) fprintf(stderr, "mainfor: accepted (%d).\n", new_socket); if(NMUX_DEBUG) fprintf(stderr, "mainfor: accepted (socket = %d).\n", new_socket);
//Close all finished clients //Close all finished clients
for(int i=0;i<clients.size();i++) for(int i=0;i<clients.size();i++)
{ {
@ -205,7 +205,7 @@ int main(int argc, char* argv[])
pthread_mutex_init(&new_client->wait_mutex, NULL); pthread_mutex_init(&new_client->wait_mutex, NULL);
new_client->wait_condition = wait_condition; new_client->wait_condition = wait_condition;
new_client->sleeping = 0; new_client->sleeping = 0;
if(pthread_create(&new_client->thread, NULL, client_thread, (void*)&new_client)==0) if(pthread_create(&new_client->thread, NULL, client_thread, (void*)new_client)==0)
{ {
clients.push_back(new_client); clients.push_back(new_client);
fprintf(stderr, MSG_START "pthread_create() done, clients now: %d\n", clients.size()); fprintf(stderr, MSG_START "pthread_create() done, clients now: %d\n", clients.size());
@ -230,7 +230,7 @@ int main(int argc, char* argv[])
index_in_current_write_buffer = 0; index_in_current_write_buffer = 0;
} }
int retval = read(input_fd, current_write_buffer + index_in_current_write_buffer, bufsize - index_in_current_write_buffer); int retval = read(input_fd, current_write_buffer + index_in_current_write_buffer, bufsize - index_in_current_write_buffer);
if(NMUX_DEBUG) fprintf(stderr, "mainfor: read %d\n", retval); // if(NMUX_DEBUG) fprintf(stderr, "mainfor: read %d\n", retval);
if(retval>0) if(retval>0)
{ {
index_in_current_write_buffer += retval; index_in_current_write_buffer += retval;
@ -242,7 +242,8 @@ int main(int argc, char* argv[])
} }
else if(retval==-1) else if(retval==-1)
{ {
error_exit(MSG_START "(main thread/for) error in read(), exiting.\n"); if(errno == EAGAIN) { if(NMUX_DEBUG) fprintf(stderr, "mainfor: read %d\n", retval); }
else error_exit(MSG_START "(main thread/for) error in read(), exiting.\n");
} }
} }
} }
@ -286,7 +287,8 @@ void* client_thread (void* param)
while(!pool_read_buffer || client_buffer_index >= lpool->size) while(!pool_read_buffer || client_buffer_index >= lpool->size)
{ {
if(NMUX_DEBUG) fprintf(stderr, "client 0x%x: cond_waiting for more data\n", (unsigned)param); if(NMUX_DEBUG) fprintf(stderr, "client 0x%x: cond_waiting for more data\n", (unsigned)param);
char* pool_read_buffer = (char*)lpool->get_read_buffer(this_client->tsmthread); pool_read_buffer = (char*)lpool->get_read_buffer(this_client->tsmthread);
if(pool_read_buffer) { client_buffer_index = 0; break; }
pthread_mutex_lock(&this_client->wait_mutex); pthread_mutex_lock(&this_client->wait_mutex);
this_client->sleeping = 1; this_client->sleeping = 1;
pthread_cond_wait(this_client->wait_condition, &this_client->wait_mutex); pthread_cond_wait(this_client->wait_condition, &this_client->wait_mutex);

View file

@ -27,6 +27,7 @@ void* tsmpool::get_write_buffer()
void* to_return = buffers[write_index]; void* to_return = buffers[write_index];
write_index = index_next(write_index); write_index = index_next(write_index);
pthread_mutex_unlock(&this->mutex); pthread_mutex_unlock(&this->mutex);
fprintf(stderr, "gwb: write_index = %d\n", write_index);
return to_return; return to_return;
} }
@ -58,9 +59,15 @@ void* tsmpool::get_read_buffer(tsmthread_t* thread)
{ {
pthread_mutex_lock(&this->mutex); pthread_mutex_lock(&this->mutex);
int* actual_read_index = (thread==NULL) ? &my_read_index : &thread->read_index; int* actual_read_index = (thread==NULL) ? &my_read_index : &thread->read_index;
if(*actual_read_index==index_before(write_index)) return NULL; if(*actual_read_index==index_before(write_index))
{
fprintf(stderr, "grb: fail,"
"read_index %d is just before write_index\n", *actual_read_index);
return NULL;
}
void* to_return = buffers[*actual_read_index]; void* to_return = buffers[*actual_read_index];
*actual_read_index=index_next(*actual_read_index); *actual_read_index=index_next(*actual_read_index);
pthread_mutex_unlock(&this->mutex); pthread_mutex_unlock(&this->mutex);
fprintf(stderr, "grb: read_index = %d\n", *actual_read_index);
return to_return; return to_return;
} }

View file

@ -3,6 +3,7 @@
//It implements a big circular buffer that one thread writes into, and multiple threads read from. //It implements a big circular buffer that one thread writes into, and multiple threads read from.
//The reader threads have lower priority than the writer thread (they can be left behind if the don't read fast enough). //The reader threads have lower priority than the writer thread (they can be left behind if the don't read fast enough).
#include <stdio.h>
#include <vector> #include <vector>
#include <pthread.h> #include <pthread.h>