This is a kind of stream multiplexer now (stdin input, TCP output to multiple clients), but closing clients is not handled yet.

This commit is contained in:
ha7ilm 2015-11-05 23:57:03 +01:00
parent e694e5a5b3
commit 37555f2a77
2 changed files with 138 additions and 47 deletions

146
ddcd.cpp
View file

@ -29,36 +29,33 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/ */
#include "ddcd.h" #include "ddcd.h"
#include <signal.h>
#include <stdio.h>
#include <getopt.h>
#include <string.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <iostream>
#include <vector>
#include <unistd.h>
#define SOFTWARE_NAME "ddcd" #define SOFTWARE_NAME "ddcd"
#define MSG_START SOFTWARE_NAME ": " #define MSG_START SOFTWARE_NAME ": "
typedef struct client_s
{
struct sockaddr_in addr;
int socket;
pid_t pid;
int pipefd[2];
} client_t;
int host_port = 0; int host_port = 0;
char host_address[100] = "127.0.0.1"; char host_address[100] = "127.0.0.1";
int decimation = 0; int decimation = 0;
int bufsize = 1024;
int bufsizeall;
char* buf;
int set_nonblocking(int fd)
{
int flagtmp;
if((flagtmp = fcntl(fd, F_GETFL))!=-1)
if((flagtmp = fcntl(fd, F_SETFL, flagtmp|O_NONBLOCK))!=-1)
return 0;
return 1;
}
client_t* this_client;
int main(int argc, char* argv[]) int main(int argc, char* argv[])
{ {
int c; int c;
fd_set select_fds;
for(;;) for(;;)
{ {
@ -66,9 +63,10 @@ int main(int argc, char* argv[])
static struct option long_options[] = { static struct option long_options[] = {
{"port", required_argument, 0, 'p' }, {"port", required_argument, 0, 'p' },
{"address", required_argument, 0, 'a' }, {"address", required_argument, 0, 'a' },
{"decimation", required_argument, 0, 'd' } {"decimation", required_argument, 0, 'd' },
{"bufsize", required_argument, 0, 'b' }
}; };
c = getopt_long(argc, argv, "p:a:d:", long_options, &option_index); c = getopt_long(argc, argv, "p:a:d:b:", long_options, &option_index);
if(c==-1) break; if(c==-1) break;
switch (c) switch (c)
{ {
@ -82,6 +80,9 @@ int main(int argc, char* argv[])
case 'd': case 'd':
decimation=atoi(optarg); decimation=atoi(optarg);
break; break;
case 'b':
bufsize=atoi(optarg);
break;
case 0: case 0:
case '?': case '?':
case ':': case ':':
@ -90,62 +91,125 @@ int main(int argc, char* argv[])
} }
} }
if(!decimation) { fprintf(stderr, MSG_START "missing required command line argument, --decimation.\n"); exit(1); } if(!decimation) error_exit(MSG_START "missing required command line argument, --decimation.\n");
if(!host_port) { fprintf(stderr, MSG_START "missing required command line argument, --port.\n"); exit(1); } if(!host_port) error_exit(MSG_START "missing required command line argument, --port.\n");
struct sockaddr_in addr_host; struct sockaddr_in addr_host;
int listen_socket; int listen_socket;
std::vector<client_t*> clients(10); std::vector<client_t*> clients;
clients.reserve(100);
listen_socket=socket(AF_INET,SOCK_STREAM,0); listen_socket=socket(AF_INET,SOCK_STREAM,0);
int sockopt = 1;
if( setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&sockopt, sizeof(sockopt)) == -1 )
error_exit(MSG_START "cannot set SO_REUSEADDR.\n"); //the best description on SO_REUSEADDR ever: http://stackoverflow.com/a/14388707/3182453
memset(&addr_host,'0',sizeof(addr_host)); memset(&addr_host,'0',sizeof(addr_host));
addr_host.sin_family=AF_INET; addr_host.sin_family=AF_INET;
addr_host.sin_port=htons(host_port); addr_host.sin_port=htons(host_port);
addr_host.sin_addr.s_addr = INADDR_ANY; addr_host.sin_addr.s_addr = INADDR_ANY;
if( (addr_host.sin_addr.s_addr=inet_addr(host_address)) == INADDR_NONE ) if( (addr_host.sin_addr.s_addr=inet_addr(host_address)) == INADDR_NONE )
{ fprintf(stderr, MSG_START "invalid host address.\n"); exit(1); } error_exit(MSG_START "invalid host address.\n");
if( bind(listen_socket, (struct sockaddr*) &addr_host, sizeof(addr_host)) < 0 ) if( bind(listen_socket, (struct sockaddr*) &addr_host, sizeof(addr_host)) < 0 )
{ fprintf(stderr, MSG_START "cannot bind() address to the socket.\n"); exit(1); } error_exit(MSG_START "cannot bind() address to the socket.\n");
if( listen(listen_socket, 10) == -1 ) if( listen(listen_socket, 10) == -1 )
{ fprintf(stderr, MSG_START "cannot listen() on socket.\n"); exit(1); } error_exit(MSG_START "cannot listen() on socket.\n");
for(;;)
{
struct sockaddr_in addr_cli; struct sockaddr_in addr_cli;
socklen_t addr_cli_len = sizeof(addr_cli); socklen_t addr_cli_len = sizeof(addr_cli);
int new_socket; int new_socket;
if( (new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) == -1) //The server will wait on these sockets later...
//Set stdin and listen_socket to non-blocking
if(set_nonblocking(STDIN_FILENO) || set_nonblocking(listen_socket))
error_exit(MSG_START "cannot set_nonblocking().\n");
bufsizeall = bufsize*sizeof(char);
buf = (char*)malloc(bufsizeall);
FD_ZERO(&select_fds);
FD_SET(listen_socket, &select_fds);
FD_SET(STDIN_FILENO, &select_fds);
int highfd = ((listen_socket>STDIN_FILENO)?listen_socket:STDIN_FILENO) + 1;
for(;;)
{ {
fprintf(stderr, MSG_START "cannot accept() a connection.\n"); //Let's wait until there is any new data to read, or any new connection!
select(highfd, &select_fds, NULL, NULL, NULL);
//Is there a new client connection?
if( (new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) != -1)
{
this_client = new client_t;
memcpy(&this_client->addr, &addr_cli, sizeof(this_client->addr));
this_client->socket = new_socket;
if(pipe(this_client->pipefd) == -1)
{
perror(MSG_START "cannot open new pipe() for the client.\n");
continue; continue;
} }
if(this_client->pid = fork())
client_t* new_client = new client_t;
memcpy(&new_client->addr, &addr_cli, sizeof(new_client->addr));
new_client->socket = new_socket;
if(new_client->pid = fork())
{ {
//We're the parent //We're the parent
clients.push_back(new_client); set_nonblocking(this_client->pipefd[1]);
printf("client pid: %d\n", new_client->pid); clients.push_back(this_client);
printf("client pid: %d\n", this_client->pid);
} }
else else
{ {
//We're the client //We're the client
client(); client();
break; return 1;
} }
} }
int retval = read(STDIN_FILENO, buf, bufsizeall);
if(retval==0)
{
//end of input stream, close clients and exit
}
else if(retval != -1)
{
for (int i=0;i<clients.size(); i++)
{
if(write(clients[i]->pipefd[1], buf, retval)==-1)
print_client(clients[i], "lost buffer, failed to write pipe");
}
}
//TODO: at the end, server closes pipefd[1] for client
//close(this_client->pipefd[1]);
}
return 0; return 0;
} }
void print_client(client_t* client, const char* what)
{
fprintf(stderr,MSG_START " (client %s:%d) %s\n", inet_ntoa(client->addr.sin_addr), client->addr.sin_port, what);
}
void client_cleanup()
{
close(this_client->pipefd[0]);
}
void client() void client()
{ {
printf("I'm the client\n"); printf("I'm the client\n");
for(;;) sleep(1); for(;;)
{
read(this_client->pipefd[0],buf,bufsizeall);
send(this_client->socket,buf,bufsizeall,0);
}
}
void error_exit(const char* why)
{
perror(why);
exit(1);
} }

27
ddcd.h
View file

@ -1 +1,28 @@
#pragma once
#include <signal.h>
#include <stdio.h>
#include <getopt.h>
#include <string.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <iostream>
#include <vector>
#include <unistd.h>
#include <fcntl.h>
typedef struct client_s
{
struct sockaddr_in addr;
int socket;
pid_t pid;
int pipefd[2];
} client_t;
void client(); void client();
void error_exit(const char* why);
void print_client(client_t* client, const char* what);