Introduce process observer

Add a tool to easily observe process termination.

This allows to move this complexity out of the server code.
This commit is contained in:
Romain Vimont 2021-11-12 09:49:37 +01:00
parent aa011832c1
commit 03de9224fc
4 changed files with 171 additions and 53 deletions

View file

@ -329,19 +329,6 @@ server_init(struct server *server) {
server->serial = NULL; server->serial = NULL;
server->process = SC_PROCESS_NONE; server->process = SC_PROCESS_NONE;
bool ok = sc_mutex_init(&server->mutex);
if (!ok) {
return false;
}
ok = sc_cond_init(&server->process_terminated_cond);
if (!ok) {
sc_mutex_destroy(&server->mutex);
return false;
}
server->process_terminated = false;
server->server_socket = SC_INVALID_SOCKET; server->server_socket = SC_INVALID_SOCKET;
server->video_socket = SC_INVALID_SOCKET; server->video_socket = SC_INVALID_SOCKET;
server->control_socket = SC_INVALID_SOCKET; server->control_socket = SC_INVALID_SOCKET;
@ -354,25 +341,20 @@ server_init(struct server *server) {
return true; return true;
} }
static int static void
run_wait_server(void *data) { server_on_terminated(void *userdata) {
struct server *server = data; struct server *server = userdata;
sc_process_wait(server->process, false); // ignore exit code
sc_mutex_lock(&server->mutex); // No need for synchronization, server_socket is initialized before the
server->process_terminated = true; // observer thread is created.
sc_cond_signal(&server->process_terminated_cond);
sc_mutex_unlock(&server->mutex);
// no need for synchronization, server_socket is initialized before this
// thread was created
if (server->server_socket != SC_INVALID_SOCKET) { if (server->server_socket != SC_INVALID_SOCKET) {
// Unblock any accept() // If the server process dies before connecting to the server socket,
// then the client will be stuck forever on accept(). To avoid the
// problem, wake up the accept() call when the server dies.
net_interrupt(server->server_socket); net_interrupt(server->server_socket);
} }
LOGD("Server terminated"); LOGD("Server terminated");
return 0;
} }
bool bool
@ -400,14 +382,11 @@ server_start(struct server *server, const struct server_params *params) {
goto error; goto error;
} }
// If the server process dies before connecting to the server socket, then static const struct sc_process_listener listener = {
// the client will be stuck forever on accept(). To avoid the problem, we .on_terminated = server_on_terminated,
// must be able to wake up the accept() call when the server dies. To keep };
// things simple and multiplatform, just spawn a new thread waiting for the bool ok = sc_process_observer_init(&server->observer, server->process,
// server process and calling shutdown()/close() on the server socket if &listener, server);
// necessary to wake up any accept() blocking call.
bool ok = sc_thread_create(&server->wait_server_thread, run_wait_server,
"wait-server", server);
if (!ok) { if (!ok) {
sc_process_terminate(server->process); sc_process_terminate(server->process);
sc_process_wait(server->process, true); // ignore exit code sc_process_wait(server->process, true); // ignore exit code
@ -516,27 +495,25 @@ server_stop(struct server *server) {
} }
// Give some delay for the server to terminate properly // Give some delay for the server to terminate properly
sc_mutex_lock(&server->mutex);
bool signaled = false;
if (!server->process_terminated) {
#define WATCHDOG_DELAY SC_TICK_FROM_SEC(1) #define WATCHDOG_DELAY SC_TICK_FROM_SEC(1)
signaled = sc_cond_timedwait(&server->process_terminated_cond, sc_tick deadline = sc_tick_now() + WATCHDOG_DELAY;
&server->mutex, bool terminated =
sc_tick_now() + WATCHDOG_DELAY); sc_process_observer_timedwait(&server->observer, deadline);
}
sc_mutex_unlock(&server->mutex);
// After this delay, kill the server if it's not dead already. // After this delay, kill the server if it's not dead already.
// On some devices, closing the sockets is not sufficient to wake up the // On some devices, closing the sockets is not sufficient to wake up the
// blocking calls while the device is asleep. // blocking calls while the device is asleep.
if (!signaled) { if (!terminated) {
// The process is terminated, but not reaped (closed) yet, so its PID // The process may have terminated since the check, but it is not
// is still valid. // reaped (closed) yet, so its PID is still valid, and it is ok to call
// sc_process_terminate() even in that case.
LOGW("Killing the server..."); LOGW("Killing the server...");
sc_process_terminate(server->process); sc_process_terminate(server->process);
} }
sc_thread_join(&server->wait_server_thread, NULL); sc_process_observer_join(&server->observer);
sc_process_observer_destroy(&server->observer);
sc_process_close(server->process); sc_process_close(server->process);
} }
@ -558,6 +535,4 @@ server_destroy(struct server *server) {
} }
} }
free(server->serial); free(server->serial);
sc_cond_destroy(&server->process_terminated_cond);
sc_mutex_destroy(&server->mutex);
} }

View file

@ -22,12 +22,10 @@ struct server_info {
struct server { struct server {
char *serial; char *serial;
sc_pid process;
sc_thread wait_server_thread;
sc_mutex mutex; sc_pid process;
sc_cond process_terminated_cond; // alive only between start() and stop()
bool process_terminated; struct sc_process_observer observer;
sc_socket server_socket; // only used if !tunnel_forward sc_socket server_socket; // only used if !tunnel_forward
sc_socket video_socket; sc_socket video_socket;

View file

@ -1,5 +1,6 @@
#include "process.h" #include "process.h"
#include <assert.h>
#include <libgen.h> #include <libgen.h>
#include "log.h" #include "log.h"
@ -41,3 +42,80 @@ sc_pipe_read_all(sc_pipe pipe, char *data, size_t len) {
} }
return copied; return copied;
} }
static int
run_observer(void *data) {
struct sc_process_observer *observer = data;
sc_process_wait(observer->pid, false); // ignore exit code
sc_mutex_lock(&observer->mutex);
observer->terminated = true;
sc_cond_signal(&observer->cond_terminated);
sc_mutex_unlock(&observer->mutex);
if (observer->listener) {
observer->listener->on_terminated(observer->listener_userdata);
}
return 0;
}
bool
sc_process_observer_init(struct sc_process_observer *observer, sc_pid pid,
const struct sc_process_listener *listener,
void *listener_userdata) {
// Either no listener, or on_terminated() is defined
assert(!listener || listener->on_terminated);
bool ok = sc_mutex_init(&observer->mutex);
if (!ok) {
return false;
}
ok = sc_cond_init(&observer->cond_terminated);
if (!ok) {
sc_mutex_destroy(&observer->mutex);
return false;
}
observer->pid = pid;
observer->listener = listener;
observer->listener_userdata = listener_userdata;
observer->terminated = false;
ok = sc_thread_create(&observer->thread, run_observer, "process_observer",
observer);
if (!ok) {
sc_cond_destroy(&observer->cond_terminated);
sc_mutex_destroy(&observer->mutex);
return false;
}
return true;
}
bool
sc_process_observer_timedwait(struct sc_process_observer *observer,
sc_tick deadline) {
sc_mutex_lock(&observer->mutex);
bool timed_out = false;
while (!observer->terminated && !timed_out) {
timed_out = !sc_cond_timedwait(&observer->cond_terminated,
&observer->mutex, deadline);
}
bool terminated = observer->terminated;
sc_mutex_unlock(&observer->mutex);
return terminated;
}
void
sc_process_observer_join(struct sc_process_observer *observer) {
sc_thread_join(&observer->thread, NULL);
}
void
sc_process_observer_destroy(struct sc_process_observer *observer) {
sc_cond_destroy(&observer->cond_terminated);
sc_mutex_destroy(&observer->mutex);
}

View file

@ -4,6 +4,7 @@
#include "common.h" #include "common.h"
#include <stdbool.h> #include <stdbool.h>
#include "util/thread.h"
#ifdef _WIN32 #ifdef _WIN32
@ -32,6 +33,34 @@
#endif #endif
struct sc_process_listener {
void (*on_terminated)(void *userdata);
};
/**
* Tool to observe process termination
*
* To keep things simple and multiplatform, it runs a separate thread to wait
* for process termination (without closing the process to avoid race
* conditions).
*
* It allows a caller to block until the process is terminated (with a
* timeout), and to be notified asynchronously from the observer thread.
*
* The process is not owned by the observer (the observer will never close it).
*/
struct sc_process_observer {
sc_pid pid;
sc_mutex mutex;
sc_cond cond_terminated;
bool terminated;
sc_thread thread;
const struct sc_process_listener *listener;
void *listener_userdata;
};
enum sc_process_result { enum sc_process_result {
SC_PROCESS_SUCCESS, SC_PROCESS_SUCCESS,
SC_PROCESS_ERROR_GENERIC, SC_PROCESS_ERROR_GENERIC,
@ -106,4 +135,42 @@ sc_pipe_read_all(sc_pipe pipe, char *data, size_t len);
void void
sc_pipe_close(sc_pipe pipe); sc_pipe_close(sc_pipe pipe);
/**
* Start observing process
*
* The listener is optional. If set, its callback will be called from the
* observer thread once the process is terminated.
*/
bool
sc_process_observer_init(struct sc_process_observer *observer, sc_pid pid,
const struct sc_process_listener *listener,
void *listener_userdata);
/**
* Wait for process termination until a deadline
*
* Return true if the process is already terminated. Return false if the
* process terminatation has not been detected yet (however, it may have
* terminated in the meantime).
*
* To wait without timeout/deadline, just use sc_process_wait() instead.
*/
bool
sc_process_observer_timedwait(struct sc_process_observer *observer,
sc_tick deadline);
/**
* Join the observer thread
*/
void
sc_process_observer_join(struct sc_process_observer *observer);
/**
* Destroy the observer
*
* This does not close the associated process.
*/
void
sc_process_observer_destroy(struct sc_process_observer *observer);
#endif #endif