diff --git a/ddcd.cpp b/ddcd.cpp index f8fe9d6..c62abe5 100644 --- a/ddcd.cpp +++ b/ddcd.cpp @@ -1,5 +1,5 @@ /* -This software is part of libcsdr, a set of simple DSP routines for +This software is part of libcsdr, a set of simple DSP routines for Software Defined Radio. Copyright (c) 2014, Andras Retzler @@ -30,19 +30,21 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "ddcd.h" + int host_port = 0; char host_address[100] = "127.0.0.1"; +int thread_cntr = 0; + +//CLI parameters int decimation = 0; float transition_bw = 0.05; -int bufsize = 1024; +int bufsize = 1024; //! currently unused int bufcnt = 1024; -int maxbufcnt = 1000 -int thread_cntr = 0; char ddc_method_str[100] = "td"; ddc_method_t ddc_method; void sig_handler(int signo) -{ +{ fprintf(stderr, MSG_START "signal %d caught, exiting ddcd...\n", signo); fflush(stderr); exit(0); @@ -59,12 +61,13 @@ int main(int argc, char* argv[]) {"address", required_argument, 0, 'a' }, {"decimation", required_argument, 0, 'd' }, {"bufsize", required_argument, 0, 'b' }, + {"bufcnt", required_argument, 0, 'n' }, {"method", required_argument, 0, 'm' }, {"transition", required_argument, 0, 't' } }; - c = getopt_long(argc, argv, "p:a:d:b:m:t:", long_options, &option_index); + c = getopt_long(argc, argv, "p:a:d:b:n:m:t:", long_options, &option_index); if(c==-1) break; - switch (c) + switch (c) { case 'a': host_address[100-1]=0; @@ -79,6 +82,9 @@ int main(int argc, char* argv[]) case 'b': bufsize=atoi(optarg); break; + case 'n': + bufcnt=atoi(optarg); + break; case 'm': ddc_method_str[100-1]=0; strncpy(ddc_method_str,optarg,100-1); @@ -93,22 +99,23 @@ int main(int argc, char* argv[]) print_exit(MSG_START "error in getopt_long()\n"); } } - + if(!decimation) print_exit(MSG_START "missing required command line argument, --decimation.\n"); if(!host_port) print_exit(MSG_START "missing required command line argument, --port.\n"); if(decimation<0) print_exit(MSG_START "invalid value for --decimation (should be >0).\n"); if(decimation==1) fprintf(stderr, MSG_START "decimation = 1, just copying raw samples.\n"); if(transition_bw<0||transition_bw>0.5) print_exit(MSG_START "invalid value for --transition (should be between 0 and 0.5).\n"); - - if(decimation==1); //don't do anything then - else if(!strcmp(ddc_method_str,"td")) + if(bufsize<0) print_exit(MSG_START "invalid value for --bufsize (should be >0)\n"); + if(bufcnt<0) print_exit(MSG_START "invalid value for --bufcnt (should be >0)\n"); + if(decimation==1); //don't do anything then //!will have to take care about this later + else if(!strcmp(ddc_method_str,"td")) { - ddc_method = M_TD; + ddc_method = M_TD; fprintf(stderr, MSG_START "method is M_TD (default).\n"); } - else if (!strcmp(ddc_method_str,"fastddc")) + else if (!strcmp(ddc_method_str,"fastddc")) { - ddc_method = M_FASTDDC; + ddc_method = M_FASTDDC; fprintf(stderr, MSG_START "method is M_FASTDDC.\n"); } else print_exit(MSG_START "invalid parameter given to --method.\n"); @@ -122,7 +129,7 @@ int main(int argc, char* argv[]) sigaction(SIGQUIT, &sa, NULL); sigaction(SIGINT, &sa, NULL); sigaction(SIGHUP, &sa, NULL); - + struct sockaddr_in addr_host; int listen_socket; std::vector clients; @@ -132,13 +139,13 @@ int main(int argc, char* argv[]) int sockopt = 1; if( setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&sockopt, sizeof(sockopt)) == -1 ) error_exit(MSG_START "cannot set SO_REUSEADDR"); //the best description on SO_REUSEADDR ever: http://stackoverflow.com/a/14388707/3182453 - + memset(&addr_host,'0',sizeof(addr_host)); addr_host.sin_family = AF_INET; addr_host.sin_port = htons(host_port); addr_host.sin_addr.s_addr = INADDR_ANY; - if( (addr_host.sin_addr.s_addr=inet_addr(host_address)) == INADDR_NONE ) + if( (addr_host.sin_addr.s_addr=inet_addr(host_address)) == INADDR_NONE ) error_exit(MSG_START "invalid host address"); if( bind(listen_socket, (struct sockaddr*) &addr_host, sizeof(addr_host)) < 0 ) @@ -153,17 +160,24 @@ int main(int argc, char* argv[]) socklen_t addr_cli_len = sizeof(addr_cli); int new_socket; - int highfd = 0; + int highfd = 0; FD_ZERO(&select_fds); FD_SET(listen_socket, &select_fds); maxfd(&highfd, listen_socket); FD_SET(input_fd, &select_fds); maxfd(&highfd, input_fd); - //Set stdin and listen_socket to non-blocking - if(set_nonblocking(input_fd) || set_nonblocking(listen_socket)) //don't do it before subprocess fork! + //Set stdin and listen_socket to non-blocking + if(set_nonblocking(input_fd) || set_nonblocking(listen_socket)) error_exit(MSG_START "cannot set_nonblocking()"); + //Create tsmpool + tsmpool* pool = new tsmpool(bufsize, bufcnt); + + unsigned char* current_write_buffer = pool->get_write_buffer(); + int index_in_current_write_buffer = 0; + + for(;;) { //Let's wait until there is any new data to read, or any new connection! @@ -171,71 +185,107 @@ int main(int argc, char* argv[]) //Is there a new client connection? if( (new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) != -1) - { - this_client = new client_t; - this_client->error = 0; - memcpy(&this_client->addr, &addr_cli, sizeof(this_client->addr)); - this_client->socket = new_socket; - this_client->id = thread_cntr++; - - if(pthread_create(&this_client->thread, NULL, client_thread , (void*)&this_client)<0) + { + clients_close_all_finished(); + int new_client_id = clients_get_new_id(); + if(new_client_id!=-1 && pthread_create(&new_client->thread, NULL, client_thread , (void*)&new_client)<0) { //We're the parent - clients.push_back(this_client); - fprintf(stderr, MSG_START "pthread_create() done, this_client->id: %d\n", this_client->id); + client_t* new_client = new client_t; + new_client->error = 0; + memcpy(&new_client->addr, &addr_cli, sizeof(new_client->addr)); + new_client->socket = new_socket; + new_client->id = new_client_id; + new_client->status = CS_CREATED; + clients.push_back(new_client); + fprintf(stderr, MSG_START "pthread_create() done, new_client->id: %d\n", new_client->id); } + else fprintf(stderr, MSG_START "pthread_create() failed."); } - float* pool_next = pool->get_write_buffer(); - - int retval = read(input_fd, pool_next, mainpool->size); - if(retval==0) + if(index_in_current_write_buffer >= bufsize) { - //end of input stream, close clients and exit + current_write_buffer = pool->get_write_buffer(); + index_in_current_write_buffer = 0; } - else if(retval != -1) + int retval = read(input_fd, current_write_buffer + index_in_current_write_buffer, bufsize - index_in_current_write_buffer); + if(retval>0) { - for (int i=0; ipipefd[1], buf, retval)==-1) - { - - if(!clients[i]->error) - { - print_client(clients[i], "lost buffer, failed to write pipe."); - clients[i]->error=1; - } - //fprintf(stderr, MSG_START "errno is %d\n", errno); //usually 11 - //int wpstatus; - //int wpresult = waitpid(clients[i]->pid, &wpstatus, WNOHANG); - //fprintf(stderr, MSG_START "pid is %d\n",clients[i]->pid); - //perror("somethings wrong"); - //if(wpresult == -1) print_client(clients[i], "error while waitpid()!"); - //else if(wpresult == 0) - waitpid(clients[i]->pid, NULL, WNOHANG); - if(!proc_exists(clients[i]->pid)) - { - //Client exited! - print_client(clients[i], "closing client from main process."); - close(clients[i]->pipefd[1]); - close(clients[i]->socket); - delete clients[i]; - clients.erase(clients.begin()+i); - fprintf(stderr, MSG_START "done closing client from main process.\n"); - } - } - else { if(clients[i]->error) print_client(clients[i], "pipe okay again."); clients[i]->error=0; } - } + index_in_current_write_buffer += retval; + } + else if(retval==0) + { + //!end of input stream, close clients and exit + print_exit(MSG_START "end of input, exiting.\n") } - //TODO: at the end, server closes pipefd[1] for client - } - - } -void* client_thread (void* param) +#if 0 +for (int i=0; ipipefd[1], buf, retval)==-1) + { + + if(!clients[i]->error) + { + print_client(clients[i], "lost buffer, failed to write pipe."); + clients[i]->error=1; + } + //fprintf(stderr, MSG_START "errno is %d\n", errno); //usually 11 + //int wpstatus; + //int wpresult = waitpid(clients[i]->pid, &wpstatus, WNOHANG); + //fprintf(stderr, MSG_START "pid is %d\n",clients[i]->pid); + //perror("somethings wrong"); + //if(wpresult == -1) print_client(clients[i], "error while waitpid()!"); + //else if(wpresult == 0) + waitpid(clients[i]->pid, NULL, WNOHANG); + if(!proc_exists(clients[i]->pid)) + { + //Client exited! + print_client(clients[i], "closing client from main process."); + close(clients[i]->pipefd[1]); + close(clients[i]->socket); + delete clients[i]; + clients.erase(clients.begin()+i); + fprintf(stderr, MSG_START "done closing client from main process.\n"); + } + } + else { if(clients[i]->error) print_client(clients[i], "pipe okay again."); clients[i]->error=0; } +} +} +//TODO: at the end, server closes pipefd[1] for client +#endif + +int clients_get_new_id() +{ + int new_id=-1; + for(int i=0; iid) new_id = clients[i]->id; + if(new_id!=INT_MAX) return ++new_id; //will also work if clients is empty (will return -1+1==0) + //should test this part, too: + for(new_id=0;new_idid==new_id) { found = 1; break; } + if(found) continue; + else return new_id; + } + return -1; +} + +void clients_close_all_finished() +{ + for(int i=0;istatus == CS_THREAD_FINISHED) clients.erase(i); + } +} + +void* client_thread (void* param) //!TODO +{ + client_t* me_the_client = (client_t*)param; + + return NULL; } void error_exit(const char* why) @@ -252,6 +302,5 @@ void print_exit(const char* why) void maxfd(int* maxfd, int fd) { - if(fd>=*maxfd) *maxfd=fd+1; + if(fd>=*maxfd) *maxfd=fd+1; } - diff --git a/ddcd.h b/ddcd.h index 5436476..a6f67b1 100644 --- a/ddcd.h +++ b/ddcd.h @@ -9,27 +9,36 @@ #include #include #include +#include #define SOFTWARE_NAME "ddcd" #define MSG_START SOFTWARE_NAME ": " -typedef enum ddc_method_e +typedef enum ddc_method_e { M_TD, M_FASTDDC } ddc_method_t; +typedef enum client_status_e +{ + CS_CREATED, + CS_THREAD_RUNNING, + CS_THREAD_FINISHED +} client_status_t; + + typedef struct client_s { struct sockaddr_in addr; int id; int socket; - int error; + int error; //set to non-zero on error (data transfer failed) pthread_t thread; + client_status_t status; + } client_t; void print_exit(const char* why); void error_exit(const char* why); void maxfd(int* maxfd, int fd); - - diff --git a/tsmpool.cpp b/tsmpool.cpp index 3af0149..237ef44 100644 --- a/tsmpool.cpp +++ b/tsmpool.cpp @@ -1,8 +1,10 @@ -tsmpool::tsmpool(size_t size, int num) +#include "tsmpool.h" + +tsmpool::tsmpool(size_t size, int num) { this->threads_cntr = 0; - this->num = num; this->size = size; + this->num = num; //number of buffers of (size) to alloc this->ok = 1; this->lowest_read_index = -1; if (pthread_mutex_init(&this->mutex, NULL) != 0) this->ok=0; @@ -10,8 +12,8 @@ tsmpool::tsmpool(size_t size, int num) size_t tsmpool::get_size() { return this->size; } -void* tsmpool::get_write_buffer() -{ +void* tsmpool::get_write_buffer() +{ if(write_index==index_before(lowest_read_index)) return NULL; void* to_return = buffers[write_index]; write_index=index_next(write_index); @@ -19,7 +21,7 @@ void* tsmpool::get_write_buffer() tsmthread_t* tsmpool::register_thread() { - if(!ok) return -1; + if(!ok) return NULL; pthread_mutex_lock(&this->mutex); tsmthread_t* thread = new tsmthread_t; thread->read_index = write_index; diff --git a/tsmpool.h b/tsmpool.h index aa8385c..f855226 100644 --- a/tsmpool.h +++ b/tsmpool.h @@ -1,7 +1,12 @@ -typedef struct tsmthread_s -{ +//tsmpool stands for Thread-Safe Memory Pool. + +//It implements a big circular buffer that one thread writes into, and multiple threads read from. +//The reader threads have lower priority than the writer thread (they can be left behind if the don't read fast enough). + +typedef struct tsmthread_s +{ int read_index; //it always points to the next buffer to be read -} tsmthread_t; +} tsmthread_t; class tsmpool { @@ -25,5 +30,3 @@ public: int index_next(int index) { return (index+1==size)?0:index; } int index_before(int index) { return (index-1<0)?size-1:index; } } - -