diff --git a/Makefile b/Makefile index c9bafbe..06f0709 100644 --- a/Makefile +++ b/Makefile @@ -26,8 +26,6 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - LIBSOURCES = fft_fftw.c libcsdr_wrapper.c #SOURCES = csdr.c $(LIBSOURCES) cpufeature = $(if $(findstring $(1),$(shell cat /proc/cpuinfo)),$(2)) @@ -41,6 +39,8 @@ PARAMS_LOOPVECT = -O3 -ffast-math -fdump-tree-vect-details -dumpbase dumpvect PARAMS_LIBS = -g -lm -lrt -lfftw3f -DUSE_FFTW -DLIBCSDR_GPL -DUSE_IMA_ADPCM PARAMS_SO = -fpic PARAMS_MISC = -Wno-unused-result +#DEBUG_ON = 0 #debug is always on by now (anyway it could be compiled with `make DEBUG_ON=1`) +#PARAMS_DEBUG = $(if $(DEBUG_ON),-g,) FFTW_PACKAGE = fftw-3.3.3 .PHONY: clean-vect clean @@ -56,20 +56,21 @@ csdr: csdr.c libcsdr.so gcc -std=gnu99 $(PARAMS_LOOPVECT) $(PARAMS_SIMD) csdr.c $(PARAMS_LIBS) -L. -lcsdr $(PARAMS_MISC) -o csdr ddcd: ddcd.cpp libcsdr.so ddcd.h g++ $(PARAMS_LOOPVECT) $(PARAMS_SIMD) ddcd.cpp $(PARAMS_LIBS) -L. -lcsdr -lpthread $(PARAMS_MISC) -o ddcd -nmux: nmux.cpp libcsdr.so nmux.h - g++ $(PARAMS_LOOPVECT) $(PARAMS_SIMD) nmux.cpp $(PARAMS_LIBS) -L. -lcsdr -lpthread $(PARAMS_MISC) -o nmux +nmux: nmux.cpp libcsdr.so nmux.h tsmpool.cpp tsmpool.h + g++ $(PARAMS_LOOPVECT) $(PARAMS_SIMD) nmux.cpp tsmpool.cpp $(PARAMS_LIBS) -L. -lcsdr -lpthread $(PARAMS_MISC) -o nmux arm-cross: clean-vect #note: this doesn't work since having added FFTW arm-linux-gnueabihf-gcc -std=gnu99 -O3 -fshort-double -ffast-math -dumpbase dumpvect-arm -fdump-tree-vect-details -mfloat-abi=softfp -march=armv7-a -mtune=cortex-a9 -mfpu=neon -mvectorize-with-neon-quad -Wno-unused-result -Wformat=0 $(SOURCES) -lm -o ./csdr clean-vect: rm -f dumpvect*.vect clean: clean-vect - rm -f libcsdr.so csdr ddcd + rm -f libcsdr.so csdr ddcd nmux install: all install -m 0755 libcsdr.so /usr/lib install -m 0755 csdr /usr/bin install -m 0755 csdr-fm /usr/bin - install -m 0755 ddcd /usr/bin + install -m 0755 nmux /usr/bin + -install -m 0755 ddcd /usr/bin ldconfig uninstall: rm /usr/lib/libcsdr.so /usr/bin/csdr /usr/bin/csdr-fm diff --git a/nmux.cpp b/nmux.cpp index 4225678..7a3ee3e 100644 --- a/nmux.cpp +++ b/nmux.cpp @@ -30,6 +30,11 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "nmux.h" +char help_text[]="nmux is a TCP stream multiplexer. It reads data from the standard input, and sends it to each client connected through TCP sockets. Available command line options are:\n" +"\t--port (-p), --address (-a): TCP port and address to listen.\n" +"\t--bufsize (-b), --bufcnt (-n): Internal buffer size and count.\n" +"\t--help (-h): Show this message.\n"; + int host_port = 0; char host_address[100] = "127.0.0.1"; int thread_cntr = 0; @@ -54,6 +59,7 @@ int main(int argc, char* argv[]) global_argv = argv; global_argc = argc; int c; + int no_options = 1; for(;;) { int option_index = 0; @@ -61,10 +67,13 @@ int main(int argc, char* argv[]) {"port", required_argument, 0, 'p' }, {"address", required_argument, 0, 'a' }, {"bufsize", required_argument, 0, 'b' }, - {"bufcnt", required_argument, 0, 'n' } + {"bufcnt", required_argument, 0, 'n' }, + {"help", no_argument, 0, 'h' }, + {0, 0, 0, 0 } }; - c = getopt_long(argc, argv, "p:a:b:n:", long_options, &option_index); + c = getopt_long(argc, argv, "p:a:b:n:h", long_options, &option_index); if(c==-1) break; + no_options = 0; switch (c) { case 'a': @@ -80,14 +89,18 @@ int main(int argc, char* argv[]) case 'n': bufcnt=atoi(optarg); break; + case 'h': + print_exit(help_text); + break; case 0: case '?': case ':': - default:; - print_exit(MSG_START "error in getopt_long()\n"); + default: + print_exit(MSG_START "(main thread) error in getopt_long()\n"); } } + if(no_options) print_exit(help_text); if(!host_port) print_exit(MSG_START "missing required command line argument, --port.\n"); if(bufsize<0) print_exit(MSG_START "invalid value for --bufsize (should be >0)\n"); if(bufcnt<0) print_exit(MSG_START "invalid value for --bufcnt (should be >0)\n"); @@ -110,7 +123,7 @@ int main(int argc, char* argv[]) int sockopt = 1; if( setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&sockopt, sizeof(sockopt)) == -1 ) - error_exit(MSG_START "cannot set SO_REUSEADDR"); //the best description on SO_REUSEADDR ever: http://stackoverflow.com/a/14388707/3182453 + error_exit(MSG_START "(main thread) cannot set SO_REUSEADDR"); //the best description on SO_REUSEADDR ever: http://stackoverflow.com/a/14388707/3182453 memset(&addr_host,'0',sizeof(addr_host)); addr_host.sin_family = AF_INET; @@ -118,15 +131,15 @@ int main(int argc, char* argv[]) addr_host.sin_addr.s_addr = INADDR_ANY; if( (addr_host.sin_addr.s_addr=inet_addr(host_address)) == INADDR_NONE ) - error_exit(MSG_START "invalid host address"); + error_exit(MSG_START "(main thread) invalid host address"); if( bind(listen_socket, (struct sockaddr*) &addr_host, sizeof(addr_host)) < 0 ) - error_exit(MSG_START "cannot bind() address to the socket"); + error_exit(MSG_START "(main thread) cannot bind() address to the socket"); if( listen(listen_socket, 10) == -1 ) - error_exit(MSG_START "cannot listen() on socket"); + error_exit(MSG_START "(main thread) cannot listen() on socket"); - fprintf(stderr,MSG_START "listening on %s:%d\n", inet_ntoa(addr_host.sin_addr), host_port); + fprintf(stderr,MSG_START "(main thread) listening on %s:%d\n", inet_ntoa(addr_host.sin_addr), host_port); struct sockaddr_in addr_cli; socklen_t addr_cli_len = sizeof(addr_cli); @@ -143,13 +156,13 @@ int main(int argc, char* argv[]) //Set stdin and listen_socket to non-blocking if(set_nonblocking(input_fd) || set_nonblocking(listen_socket)) - error_exit(MSG_START "cannot set_nonblocking()"); + error_exit(MSG_START "(main thread) cannot set_nonblocking()"); //Create tsmpool pool = new tsmpool(bufsize, bufcnt); - if(!pool->ok) print_exit(MSG_START "tsmpool failed to initialize\n"); + if(!pool->is_ok()) print_exit(MSG_START "(main thread) tsmpool failed to initialize\n"); - unsigned char* current_write_buffer = pool->get_write_buffer(); + unsigned char* current_write_buffer = (unsigned char*)pool->get_write_buffer(); int index_in_current_write_buffer = 0; //Create wait condition: client threads waiting for input data from the main thread will be @@ -157,7 +170,7 @@ int main(int argc, char* argv[]) // data arrives. pthread_cond_t* wait_condition = new pthread_cond_t; if(!pthread_cond_init(wait_condition, NULL)) - print_exit(MSG_START "pthread_cond_init failed"); //cond_attrs is ignored by Linux + print_exit(MSG_START "(main thread) pthread_cond_init failed"); //cond_attrs is ignored by Linux for(;;) { @@ -167,7 +180,15 @@ int main(int argc, char* argv[]) //Is there a new client connection? if( (new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) != -1) { - clients_close_all_finished(); + //Close all finished clients + for(int i=0;istatus == CS_THREAD_FINISHED) + { + client_erase(clients[i]); + clients.erase(clients.begin()+i); + } + } //We're the parent, let's create a new client and initialize it client_t* new_client = new client_t; @@ -183,17 +204,17 @@ int main(int argc, char* argv[]) if(pthread_create(&new_client->thread, NULL, client_thread , (void*)&new_client)<0) { clients.push_back(new_client); - fprintf(stderr, MSG_START "pthread_create() done, clients now: %d\n", clients.size()); + fprintf(stderr, MSG_START "(main thread) pthread_create() done, clients now: %d\n", clients.size()); } else { - fprintf(stderr, MSG_START "pthread_create() failed.\n"); + fprintf(stderr, MSG_START "(main thread) pthread_create() failed.\n"); } } if(index_in_current_write_buffer >= bufsize) { - current_write_buffer = pool->get_write_buffer(); + current_write_buffer = (unsigned char*)pool->get_write_buffer(); pthread_cond_broadcast(wait_condition); //Shouldn't we do it after we put data in? // No, on get_write_buffer() actually the previous buffer is getting available @@ -209,30 +230,22 @@ int main(int argc, char* argv[]) else if(retval==0) { //End of input stream, close clients and exit - print_exit(MSG_START "end of input stream, exiting.\n") + print_exit(MSG_START "(main thread/for) end input stream, exiting.\n"); + } + else if(retval==-1) + { + error_exit(MSG_START "(main thread/for) error in read(), exiting.\n"); } } } void client_erase(client_t* client) { - pthread_mutex_destroy(client->wait_mutex); + pthread_mutex_destroy(&client->wait_mutex); pthread_cond_destroy(client->wait_condition); pool->remove_thread(client->tsmthread); } -void clients_close_all_finished() -{ - for(int i=0;istatus == CS_THREAD_FINISHED) - { - client_erase(clients[i]); - clients.erase(i); - } - } -} - void* client_thread (void* param) { client_t* this_client = (client_t*)param; @@ -243,11 +256,11 @@ void* client_thread (void* param) fd_set client_select_fds; int client_highfd = 0; FD_ZERO(&client_select_fds); - FD_SET(client->socket, &client_select_fds); - maxfd(&client_highfd, client->socket); + FD_SET(this_client->socket, &client_select_fds); + maxfd(&client_highfd, this_client->socket); //Set client->socket to non-blocking - if(set_nonblocking(client->socket)) + if(set_nonblocking(this_client->socket)) error_exit(MSG_START "cannot set_nonblocking() on client->socket"); int client_buffer_index = 0; @@ -260,17 +273,17 @@ void* client_thread (void* param) // (Wait for the server process to wake me up.) while(!pool_read_buffer || client_buffer_index == lpool->size) { - char* pool_read_buffer = (char*)lpool->get_read_buffer(); + char* pool_read_buffer = (char*)lpool->get_read_buffer(this_client->tsmthread); pthread_mutex_lock(&this_client->wait_mutex); this_client->sleeping = 1; pthread_cond_wait(this_client->wait_condition, &this_client->wait_mutex); } //Wait for the socket to be available for write. - select(highfd, NULL, &client_select_fds, NULL, NULL); + select(client_highfd, NULL, &client_select_fds, NULL, NULL); //Read data from global tsmpool and write it to client socket - int ret = send(client->socket, pool_read_buffer + client_buffer_index, lpool->size - client_buffer_index, 0); + int ret = send(this_client->socket, pool_read_buffer + client_buffer_index, lpool->size - client_buffer_index, 0); if(ret == -1) { switch(errno) diff --git a/nmux.h b/nmux.h index 40003ef..f12db48 100644 --- a/nmux.h +++ b/nmux.h @@ -4,6 +4,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -25,7 +28,7 @@ typedef struct client_s int socket; int error; //set to non-zero on error (data transfer failed) pthread_t thread; - tsmthread_t tsmthread; + tsmthread_t* tsmthread; client_status_t status; //the following members are there to give access to some global variables inside the thread: tsmpool* lpool; //local pool @@ -37,7 +40,6 @@ typedef struct client_s void print_exit(const char* why); void sig_handler(int signo); void client_erase(client_t* client); -void clients_close_all_finished(); void* client_thread (void* param); void error_exit(const char* why); void maxfd(int* maxfd, int fd); diff --git a/tsmpool.cpp b/tsmpool.cpp index ff3b773..a82fd7a 100644 --- a/tsmpool.cpp +++ b/tsmpool.cpp @@ -1,6 +1,6 @@ #include "tsmpool.h" -tsmpool::tsmpool(size_t size, int num) +tsmpool::tsmpool(size_t size, int num) : size(size), num(num) //number of buffers of (size) to alloc { @@ -45,7 +45,7 @@ int tsmpool::remove_thread(tsmthread_t* thread) if(threads[i] == thread) { delete threads[i]; - threads.erase(i); + threads.erase(threads.begin()+i); break; } pthread_mutex_unlock(&this->mutex); diff --git a/tsmpool.h b/tsmpool.h index c1ff956..b2ca44a 100644 --- a/tsmpool.h +++ b/tsmpool.h @@ -4,6 +4,7 @@ //The reader threads have lower priority than the writer thread (they can be left behind if the don't read fast enough). #include +#include using namespace std;