2016-09-21 05:52:58 +08:00
/*
This software is part of libcsdr , a set of simple DSP routines for
Software Defined Radio .
Copyright ( c ) 2014 , Andras Retzler < randras @ sdr . hu >
All rights reserved .
Redistribution and use in source and binary forms , with or without
modification , are permitted provided that the following conditions are met :
* Redistributions of source code must retain the above copyright
notice , this list of conditions and the following disclaimer .
* Redistributions in binary form must reproduce the above copyright
notice , this list of conditions and the following disclaimer in the
documentation and / or other materials provided with the distribution .
* Neither the name of the copyright holder nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission .
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS " AS IS " AND
ANY EXPRESS OR IMPLIED WARRANTIES , INCLUDING , BUT NOT LIMITED TO , THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED . IN NO EVENT SHALL ANDRAS RETZLER BE LIABLE FOR ANY
DIRECT , INDIRECT , INCIDENTAL , SPECIAL , EXEMPLARY , OR CONSEQUENTIAL DAMAGES
( INCLUDING , BUT NOT LIMITED TO , PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES ;
LOSS OF USE , DATA , OR PROFITS ; OR BUSINESS INTERRUPTION ) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY , WHETHER IN CONTRACT , STRICT LIABILITY , OR TORT
( INCLUDING NEGLIGENCE OR OTHERWISE ) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE , EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE .
*/
2017-01-10 17:34:42 +08:00
# include "nmux.h"
2016-09-21 05:52:58 +08:00
2017-01-11 17:48:25 +08:00
char help_text [ ] = " nmux is a TCP stream multiplexer. It reads data from the standard input, and sends it to each client connected through TCP sockets. Available command line options are: \n "
" \t --port (-p), --address (-a): TCP port and address to listen. \n "
" \t --bufsize (-b), --bufcnt (-n): Internal buffer size and count. \n "
" \t --help (-h): Show this message. \n " ;
2016-09-21 05:52:58 +08:00
int host_port = 0 ;
char host_address [ 100 ] = " 127.0.0.1 " ;
int thread_cntr = 0 ;
//CLI parameters
2017-01-11 03:21:30 +08:00
int bufsize = 1024 ;
2016-09-21 05:52:58 +08:00
int bufcnt = 1024 ;
2017-01-10 17:34:42 +08:00
char * * global_argv ;
2016-09-21 05:52:58 +08:00
int global_argc ;
tsmpool * pool ;
2017-01-13 02:10:27 +08:00
pthread_cond_t wait_condition ;
pthread_mutex_t wait_mutex ;
2016-09-21 05:52:58 +08:00
void sig_handler ( int signo )
{
fprintf ( stderr , MSG_START " signal %d caught, exiting... \n " , signo ) ;
fflush ( stderr ) ;
exit ( 0 ) ;
}
int main ( int argc , char * argv [ ] )
{
global_argv = argv ;
global_argc = argc ;
int c ;
2017-01-11 17:48:25 +08:00
int no_options = 1 ;
2016-09-21 05:52:58 +08:00
for ( ; ; )
{
int option_index = 0 ;
static struct option long_options [ ] = {
{ " port " , required_argument , 0 , ' p ' } ,
{ " address " , required_argument , 0 , ' a ' } ,
{ " bufsize " , required_argument , 0 , ' b ' } ,
2017-01-11 17:48:25 +08:00
{ " bufcnt " , required_argument , 0 , ' n ' } ,
{ " help " , no_argument , 0 , ' h ' } ,
{ 0 , 0 , 0 , 0 }
2016-09-21 05:52:58 +08:00
} ;
2017-01-11 17:48:25 +08:00
c = getopt_long ( argc , argv , " p:a:b:n:h " , long_options , & option_index ) ;
2016-09-21 05:52:58 +08:00
if ( c = = - 1 ) break ;
2017-01-11 17:48:25 +08:00
no_options = 0 ;
2016-09-21 05:52:58 +08:00
switch ( c )
{
case ' a ' :
host_address [ 100 - 1 ] = 0 ;
strncpy ( host_address , optarg , 100 - 1 ) ;
break ;
case ' p ' :
host_port = atoi ( optarg ) ;
break ;
case ' b ' :
bufsize = atoi ( optarg ) ;
break ;
case ' n ' :
bufcnt = atoi ( optarg ) ;
break ;
2017-01-11 17:48:25 +08:00
case ' h ' :
print_exit ( help_text ) ;
break ;
2016-09-21 05:52:58 +08:00
case 0 :
case ' ? ' :
case ' : ' :
2017-01-11 17:48:25 +08:00
default :
2017-01-12 23:24:59 +08:00
print_exit ( MSG_START " error in getopt_long() \n " ) ;
2016-09-21 05:52:58 +08:00
}
}
2017-01-11 17:48:25 +08:00
if ( no_options ) print_exit ( help_text ) ;
2016-09-21 05:52:58 +08:00
if ( ! host_port ) print_exit ( MSG_START " missing required command line argument, --port. \n " ) ;
2017-01-13 00:50:51 +08:00
if ( bufsize < = 0 ) print_exit ( MSG_START " invalid value for --bufsize (should be >0) \n " ) ;
if ( bufcnt < = 0 ) print_exit ( MSG_START " invalid value for --bufcnt (should be >0) \n " ) ;
2016-09-21 05:52:58 +08:00
//set signals
struct sigaction sa ;
memset ( & sa , 0 , sizeof ( sa ) ) ;
sa . sa_handler = sig_handler ;
sigaction ( SIGTERM , & sa , NULL ) ;
sigaction ( SIGKILL , & sa , NULL ) ;
sigaction ( SIGQUIT , & sa , NULL ) ;
sigaction ( SIGINT , & sa , NULL ) ;
sigaction ( SIGHUP , & sa , NULL ) ;
struct sockaddr_in addr_host ;
int listen_socket ;
std : : vector < client_t * > clients ;
clients . reserve ( 100 ) ;
listen_socket = socket ( AF_INET , SOCK_STREAM , 0 ) ;
int sockopt = 1 ;
if ( setsockopt ( listen_socket , SOL_SOCKET , SO_REUSEADDR , ( char * ) & sockopt , sizeof ( sockopt ) ) = = - 1 )
2017-01-12 23:24:59 +08:00
error_exit ( MSG_START " cannot set SO_REUSEADDR " ) ; //the best description on SO_REUSEADDR ever: http://stackoverflow.com/a/14388707/3182453
2016-09-21 05:52:58 +08:00
memset ( & addr_host , ' 0 ' , sizeof ( addr_host ) ) ;
addr_host . sin_family = AF_INET ;
addr_host . sin_port = htons ( host_port ) ;
addr_host . sin_addr . s_addr = INADDR_ANY ;
if ( ( addr_host . sin_addr . s_addr = inet_addr ( host_address ) ) = = INADDR_NONE )
2017-01-12 23:24:59 +08:00
error_exit ( MSG_START " invalid host address " ) ;
2016-09-21 05:52:58 +08:00
if ( bind ( listen_socket , ( struct sockaddr * ) & addr_host , sizeof ( addr_host ) ) < 0 )
2017-01-12 23:24:59 +08:00
error_exit ( MSG_START " cannot bind() address to the socket " ) ;
2016-09-21 05:52:58 +08:00
if ( listen ( listen_socket , 10 ) = = - 1 )
2017-01-12 23:24:59 +08:00
error_exit ( MSG_START " cannot listen() on socket " ) ;
2016-09-21 05:52:58 +08:00
2017-01-12 23:24:59 +08:00
fprintf ( stderr , MSG_START " listening on %s:%d \n " , inet_ntoa ( addr_host . sin_addr ) , host_port ) ;
2016-09-21 05:52:58 +08:00
struct sockaddr_in addr_cli ;
socklen_t addr_cli_len = sizeof ( addr_cli ) ;
int new_socket ;
int highfd = 0 ;
maxfd ( & highfd , listen_socket ) ;
2017-01-13 06:52:53 +08:00
maxfd ( & highfd , STDIN_FILENO ) ;
fd_set select_fds ;
2016-09-21 05:52:58 +08:00
//Set stdin and listen_socket to non-blocking
2017-01-13 06:52:53 +08:00
if ( set_nonblocking ( STDIN_FILENO ) | | set_nonblocking ( listen_socket ) )
2017-01-12 23:24:59 +08:00
error_exit ( MSG_START " cannot set_nonblocking() " ) ;
2016-09-21 05:52:58 +08:00
//Create tsmpool
pool = new tsmpool ( bufsize , bufcnt ) ;
2017-01-12 23:24:59 +08:00
if ( ! pool - > is_ok ( ) ) print_exit ( MSG_START " tsmpool failed to initialize \n " ) ;
2016-09-21 05:52:58 +08:00
2017-01-11 17:48:25 +08:00
unsigned char * current_write_buffer = ( unsigned char * ) pool - > get_write_buffer ( ) ;
2016-09-21 05:52:58 +08:00
int index_in_current_write_buffer = 0 ;
2017-01-11 03:21:30 +08:00
//Create wait condition: client threads waiting for input data from the main thread will be
// waiting on this condition. They will be woken up with pthread_cond_broadcast() if new
// data arrives.
2017-01-13 02:10:27 +08:00
if ( pthread_cond_init ( & wait_condition , NULL ) )
2017-01-12 23:24:59 +08:00
print_exit ( MSG_START " pthread_cond_init failed " ) ; //cond_attrs is ignored by Linux
2017-01-13 02:10:27 +08:00
if ( pthread_mutex_init ( & wait_mutex , NULL ) )
print_exit ( MSG_START " pthread_mutex_t failed " ) ; //cond_attrs is ignored by Linux
2016-09-21 05:52:58 +08:00
for ( ; ; )
{
2017-01-13 06:52:53 +08:00
FD_ZERO ( & select_fds ) ;
FD_SET ( listen_socket , & select_fds ) ;
FD_SET ( STDIN_FILENO , & select_fds ) ;
2017-01-13 00:45:59 +08:00
if ( NMUX_DEBUG ) fprintf ( stderr , " mainfor: selecting... " ) ;
2016-09-21 05:52:58 +08:00
//Let's wait until there is any new data to read, or any new connection!
2017-01-13 06:52:53 +08:00
int select_ret = select ( highfd , & select_fds , NULL , NULL , NULL ) ;
2017-01-13 00:45:59 +08:00
if ( NMUX_DEBUG ) fprintf ( stderr , " selected. \n " ) ;
2017-01-13 06:52:53 +08:00
if ( select_ret = = - 1 ) error_exit ( " mainfor select() error " ) ;
2017-01-13 06:32:45 +08:00
2016-09-21 05:52:58 +08:00
//Is there a new client connection?
2017-01-13 06:52:53 +08:00
if ( FD_ISSET ( listen_socket , & select_fds ) & & ( ( new_socket = accept ( listen_socket , ( struct sockaddr * ) & addr_cli , & addr_cli_len ) ) ! = - 1 ) )
2016-09-21 05:52:58 +08:00
{
2017-01-16 03:13:24 +08:00
if ( NMUX_DEBUG )
{
fprintf ( stderr , " \x1b [1m \x1b [33mmainfor: clients before closing: " ) ;
2017-01-25 00:49:04 +08:00
for ( int i = 0 ; i < clients . size ( ) ; i + + ) fprintf ( stderr , " 0x%x " , ( intptr_t ) clients [ i ] ) ;
2017-01-16 03:13:24 +08:00
fprintf ( stderr , " \x1b [0m \n " ) ;
}
2017-01-12 23:56:54 +08:00
if ( NMUX_DEBUG ) fprintf ( stderr , " mainfor: accepted (socket = %d). \n " , new_socket ) ;
2017-01-11 17:48:25 +08:00
//Close all finished clients
for ( int i = 0 ; i < clients . size ( ) ; i + + )
{
if ( clients [ i ] - > status = = CS_THREAD_FINISHED )
{
2017-01-12 23:24:59 +08:00
if ( NMUX_DEBUG ) fprintf ( stderr , " mainfor: client removed: %d \n " , i ) ;
2017-01-13 20:00:48 +08:00
//client destructor
pool - > remove_thread ( clients [ i ] - > tsmthread ) ;
2017-01-11 17:48:25 +08:00
clients . erase ( clients . begin ( ) + i ) ;
2017-01-13 20:00:48 +08:00
i - - ;
2017-01-11 17:48:25 +08:00
}
}
2017-01-13 20:00:48 +08:00
if ( NMUX_DEBUG )
{
2017-01-16 03:13:24 +08:00
fprintf ( stderr , " \x1b [1m \x1b [33mmainfor: clients after closing: " ) ;
2017-01-25 00:49:04 +08:00
for ( int i = 0 ; i < clients . size ( ) ; i + + ) fprintf ( stderr , " 0x%x " , ( intptr_t ) clients [ i ] ) ;
2017-01-13 20:00:48 +08:00
fprintf ( stderr , " \x1b [0m \n " ) ;
}
2016-09-21 05:52:58 +08:00
//We're the parent, let's create a new client and initialize it
client_t * new_client = new client_t ;
new_client - > error = 0 ;
2017-01-12 22:27:48 +08:00
memcpy ( & new_client - > addr , & addr_cli , sizeof ( struct sockaddr_in ) ) ;
2016-09-21 05:52:58 +08:00
new_client - > socket = new_socket ;
new_client - > status = CS_CREATED ;
new_client - > tsmthread = pool - > register_thread ( ) ;
new_client - > lpool = pool ;
new_client - > sleeping = 0 ;
2017-01-12 23:56:54 +08:00
if ( pthread_create ( & new_client - > thread , NULL , client_thread , ( void * ) new_client ) = = 0 )
2016-09-21 05:52:58 +08:00
{
clients . push_back ( new_client ) ;
2017-01-12 23:24:59 +08:00
fprintf ( stderr , MSG_START " pthread_create() done, clients now: %d \n " , clients . size ( ) ) ;
2016-09-21 05:52:58 +08:00
}
else
{
2017-01-12 23:24:59 +08:00
fprintf ( stderr , MSG_START " pthread_create() failed. \n " ) ;
2017-01-12 22:27:48 +08:00
pool - > remove_thread ( new_client - > tsmthread ) ;
delete new_client ;
2016-09-21 05:52:58 +08:00
}
}
2017-01-13 06:52:53 +08:00
if ( FD_ISSET ( STDIN_FILENO , & select_fds ) )
2016-09-21 05:52:58 +08:00
{
2017-01-13 06:52:53 +08:00
if ( index_in_current_write_buffer > = bufsize )
{
if ( NMUX_DEBUG ) fprintf ( stderr , " mainfor: gwbing... " ) ;
current_write_buffer = ( unsigned char * ) pool - > get_write_buffer ( ) ;
if ( NMUX_DEBUG ) fprintf ( stderr , " gwbed. \n mainfor: cond broadcasting... " ) ;
2017-01-13 19:00:43 +08:00
pthread_mutex_lock ( & wait_mutex ) ;
2017-01-13 06:52:53 +08:00
pthread_cond_broadcast ( & wait_condition ) ;
2017-01-13 19:00:43 +08:00
pthread_mutex_unlock ( & wait_mutex ) ;
2017-01-13 06:52:53 +08:00
if ( NMUX_DEBUG ) fprintf ( stderr , " cond broadcasted. \n " ) ;
//Shouldn't we do it after we put data in?
// No, on get_write_buffer() actually the previous buffer is getting available
// for read for threads that wait for new data (wait on global pthead mutex
// wait_condition).
index_in_current_write_buffer = 0 ;
}
2017-01-13 01:57:23 +08:00
if ( NMUX_DEBUG ) fprintf ( stderr , " mainfor: reading... \n " ) ;
2017-01-13 06:52:53 +08:00
int read_ret = read ( STDIN_FILENO , current_write_buffer + index_in_current_write_buffer , bufsize - index_in_current_write_buffer ) ;
if ( NMUX_DEBUG ) fprintf ( stderr , " read %d \n " , read_ret ) ;
if ( read_ret > 0 )
2017-01-13 01:57:23 +08:00
{
2017-01-13 06:52:53 +08:00
index_in_current_write_buffer + = read_ret ;
2017-01-13 01:57:23 +08:00
}
2017-01-13 06:52:53 +08:00
else if ( read_ret = = 0 )
2017-01-13 01:57:23 +08:00
{
//End of input stream, close clients and exit
print_exit ( MSG_START " (main thread/for) end input stream, exiting. \n " ) ;
}
2017-01-13 06:52:53 +08:00
else if ( read_ret = = - 1 )
2017-01-13 01:57:23 +08:00
{
2017-01-13 06:52:53 +08:00
if ( errno = = EAGAIN ) { if ( NMUX_DEBUG ) fprintf ( stderr , " mainfor: read EAGAIN \n " ) ; /* seems like select would block forever, so we just read again */ }
2017-01-13 01:57:23 +08:00
else error_exit ( MSG_START " (main thread/for) error in read ( ) , exiting . \ n " ) ;
}
2016-09-21 05:52:58 +08:00
}
}
}
void * client_thread ( void * param )
{
2017-01-25 00:49:04 +08:00
fprintf ( stderr , " client 0x%x: started! \n " , ( intptr_t ) param ) ;
2016-09-21 05:52:58 +08:00
client_t * this_client = ( client_t * ) param ;
this_client - > status = CS_THREAD_RUNNING ;
int retval ;
tsmpool * lpool = this_client - > lpool ;
2017-01-25 00:49:04 +08:00
if ( NMUX_DEBUG ) fprintf ( stderr , " client 0x%x: socket = %d! \n " , ( intptr_t ) param , this_client - > socket ) ;
2017-01-11 03:21:30 +08:00
2017-01-25 00:49:04 +08:00
if ( NMUX_DEBUG ) fprintf ( stderr , " client 0x%x: poll init... " , ( intptr_t ) param ) ;
2017-01-12 23:24:59 +08:00
struct pollfd pollfds [ 1 ] ;
pollfds [ 0 ] . fd = this_client - > socket ;
pollfds [ 0 ] . events = POLLOUT ;
pollfds [ 0 ] . revents = 0 ;
2017-01-13 00:45:59 +08:00
if ( NMUX_DEBUG ) fprintf ( stderr , " client poll inited. \n " ) ;
2017-01-12 23:24:59 +08:00
//Set this_client->socket to non-blocking
2017-01-11 17:48:25 +08:00
if ( set_nonblocking ( this_client - > socket ) )
2017-01-12 23:24:59 +08:00
error_exit ( MSG_START " cannot set_nonblocking() on this_client->socket " ) ;
2017-01-11 03:21:30 +08:00
int client_buffer_index = 0 ;
2017-01-12 23:24:59 +08:00
int client_goto_source = 0 ;
2017-01-11 03:21:30 +08:00
char * pool_read_buffer = NULL ;
2016-09-21 05:52:58 +08:00
for ( ; ; )
{
2017-01-11 03:21:30 +08:00
//Wait until there is any data to send.
// If I haven't sent all the data from my last buffer, don't wait.
// (Wait for the server process to wake me up.)
2017-01-12 22:27:48 +08:00
while ( ! pool_read_buffer | | client_buffer_index > = lpool - > size )
2016-09-21 05:52:58 +08:00
{
2017-01-25 00:49:04 +08:00
if ( NMUX_DEBUG ) fprintf ( stderr , " client 0x%x: trying to grb \n " , ( intptr_t ) param ) ;
2017-01-12 23:56:54 +08:00
pool_read_buffer = ( char * ) lpool - > get_read_buffer ( this_client - > tsmthread ) ;
if ( pool_read_buffer ) { client_buffer_index = 0 ; break ; }
2017-01-25 00:49:04 +08:00
if ( NMUX_DEBUG ) fprintf ( stderr , " client 0x%x: cond_waiting for more data \n " , ( intptr_t ) param ) ;
2017-01-13 02:10:27 +08:00
pthread_mutex_lock ( & wait_mutex ) ;
2016-09-21 05:52:58 +08:00
this_client - > sleeping = 1 ;
2017-01-13 02:10:27 +08:00
pthread_cond_wait ( & wait_condition , & wait_mutex ) ;
2017-01-13 19:00:43 +08:00
pthread_mutex_unlock ( & wait_mutex ) ;
2016-09-21 05:52:58 +08:00
}
2017-01-11 03:21:30 +08:00
//Wait for the socket to be available for write.
2017-01-25 00:49:04 +08:00
if ( NMUX_DEBUG ) fprintf ( stderr , " client 0x%x: polling for socket write... " , ( intptr_t ) param ) ;
2017-01-12 23:24:59 +08:00
int ret = poll ( pollfds , 1 , - 1 ) ;
2017-01-13 00:45:59 +08:00
if ( NMUX_DEBUG ) fprintf ( stderr , " client polled for socket write. \n " ) ;
2017-01-12 23:24:59 +08:00
if ( ret = = 0 ) continue ;
else if ( ret = = - 1 ) { client_goto_source = 1 ; goto client_thread_exit ; }
2016-09-21 05:52:58 +08:00
2017-01-11 03:21:30 +08:00
//Read data from global tsmpool and write it to client socket
2017-01-25 00:49:04 +08:00
if ( NMUX_DEBUG ) fprintf ( stderr , " client 0x%x: sending... " , ( intptr_t ) param ) ;
2017-01-13 20:00:48 +08:00
ret = send ( this_client - > socket , pool_read_buffer + client_buffer_index , lpool - > size - client_buffer_index , MSG_NOSIGNAL ) ;
2017-01-13 00:45:59 +08:00
if ( NMUX_DEBUG ) fprintf ( stderr , " client sent. \n " ) ;
2017-01-11 03:21:30 +08:00
if ( ret = = - 1 )
{
switch ( errno )
{
case EAGAIN : break ;
2017-01-12 23:24:59 +08:00
default : client_goto_source = 2 ; goto client_thread_exit ;
2017-01-11 03:21:30 +08:00
}
}
else client_buffer_index + = ret ;
2016-09-21 05:52:58 +08:00
}
2017-01-11 03:21:30 +08:00
client_thread_exit :
2017-01-25 00:49:04 +08:00
fprintf ( stderr , " client 0x%x: CS_THREAD_FINISHED, client_goto_source = %d, errno = %d " , ( intptr_t ) param , client_goto_source , errno ) ;
2017-01-13 20:00:48 +08:00
this_client - > status = CS_THREAD_FINISHED ;
2016-09-21 05:52:58 +08:00
pthread_exit ( NULL ) ;
return NULL ;
}
2017-01-11 03:21:30 +08:00
int set_nonblocking ( int fd )
{
int flagtmp ;
if ( ( flagtmp = fcntl ( fd , F_GETFL ) ) ! = - 1 )
if ( ( flagtmp = fcntl ( fd , F_SETFL , flagtmp | O_NONBLOCK ) ) ! = - 1 )
return 0 ;
return 1 ;
}
2016-09-21 05:52:58 +08:00
void error_exit ( const char * why )
{
perror ( why ) ; //do we need a \n at the end of (why)?
exit ( 1 ) ;
}
void print_exit ( const char * why )
{
fprintf ( stderr , " %s " , why ) ;
exit ( 1 ) ;
}
void maxfd ( int * maxfd , int fd )
{
if ( fd > = * maxfd ) * maxfd = fd + 1 ;
}