Use VecDeque in controller
Replace cbuf by VecDeque in controller.
This commit is contained in:
parent
f25a67f342
commit
4d989de9ae
2 changed files with 35 additions and 17 deletions
|
@ -4,19 +4,28 @@
|
||||||
|
|
||||||
#include "util/log.h"
|
#include "util/log.h"
|
||||||
|
|
||||||
|
#define SC_CONTROL_MSG_QUEUE_MAX 64
|
||||||
|
|
||||||
bool
|
bool
|
||||||
sc_controller_init(struct sc_controller *controller, sc_socket control_socket,
|
sc_controller_init(struct sc_controller *controller, sc_socket control_socket,
|
||||||
struct sc_acksync *acksync) {
|
struct sc_acksync *acksync) {
|
||||||
cbuf_init(&controller->queue);
|
sc_vecdeque_init(&controller->queue);
|
||||||
|
|
||||||
bool ok = sc_receiver_init(&controller->receiver, control_socket, acksync);
|
bool ok = sc_vecdeque_reserve(&controller->queue, SC_CONTROL_MSG_QUEUE_MAX);
|
||||||
if (!ok) {
|
if (!ok) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ok = sc_receiver_init(&controller->receiver, control_socket, acksync);
|
||||||
|
if (!ok) {
|
||||||
|
sc_vecdeque_destroy(&controller->queue);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
ok = sc_mutex_init(&controller->mutex);
|
ok = sc_mutex_init(&controller->mutex);
|
||||||
if (!ok) {
|
if (!ok) {
|
||||||
sc_receiver_destroy(&controller->receiver);
|
sc_receiver_destroy(&controller->receiver);
|
||||||
|
sc_vecdeque_destroy(&controller->queue);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,6 +33,7 @@ sc_controller_init(struct sc_controller *controller, sc_socket control_socket,
|
||||||
if (!ok) {
|
if (!ok) {
|
||||||
sc_receiver_destroy(&controller->receiver);
|
sc_receiver_destroy(&controller->receiver);
|
||||||
sc_mutex_destroy(&controller->mutex);
|
sc_mutex_destroy(&controller->mutex);
|
||||||
|
sc_vecdeque_destroy(&controller->queue);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,10 +48,12 @@ sc_controller_destroy(struct sc_controller *controller) {
|
||||||
sc_cond_destroy(&controller->msg_cond);
|
sc_cond_destroy(&controller->msg_cond);
|
||||||
sc_mutex_destroy(&controller->mutex);
|
sc_mutex_destroy(&controller->mutex);
|
||||||
|
|
||||||
struct sc_control_msg msg;
|
while (!sc_vecdeque_is_empty(&controller->queue)) {
|
||||||
while (cbuf_take(&controller->queue, &msg)) {
|
struct sc_control_msg *msg = sc_vecdeque_popref(&controller->queue);
|
||||||
sc_control_msg_destroy(&msg);
|
assert(msg);
|
||||||
|
sc_control_msg_destroy(msg);
|
||||||
}
|
}
|
||||||
|
sc_vecdeque_destroy(&controller->queue);
|
||||||
|
|
||||||
sc_receiver_destroy(&controller->receiver);
|
sc_receiver_destroy(&controller->receiver);
|
||||||
}
|
}
|
||||||
|
@ -54,13 +66,19 @@ sc_controller_push_msg(struct sc_controller *controller,
|
||||||
}
|
}
|
||||||
|
|
||||||
sc_mutex_lock(&controller->mutex);
|
sc_mutex_lock(&controller->mutex);
|
||||||
bool was_empty = cbuf_is_empty(&controller->queue);
|
bool full = sc_vecdeque_is_full(&controller->queue);
|
||||||
bool res = cbuf_push(&controller->queue, *msg);
|
if (!full) {
|
||||||
if (was_empty) {
|
bool was_empty = sc_vecdeque_is_empty(&controller->queue);
|
||||||
sc_cond_signal(&controller->msg_cond);
|
sc_vecdeque_push_noresize(&controller->queue, *msg);
|
||||||
|
if (was_empty) {
|
||||||
|
sc_cond_signal(&controller->msg_cond);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
// Otherwise (if the queue is full), the msg is discarded
|
||||||
|
|
||||||
sc_mutex_unlock(&controller->mutex);
|
sc_mutex_unlock(&controller->mutex);
|
||||||
return res;
|
|
||||||
|
return !full;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
|
@ -82,7 +100,8 @@ run_controller(void *data) {
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
sc_mutex_lock(&controller->mutex);
|
sc_mutex_lock(&controller->mutex);
|
||||||
while (!controller->stopped && cbuf_is_empty(&controller->queue)) {
|
while (!controller->stopped
|
||||||
|
&& sc_vecdeque_is_empty(&controller->queue)) {
|
||||||
sc_cond_wait(&controller->msg_cond, &controller->mutex);
|
sc_cond_wait(&controller->msg_cond, &controller->mutex);
|
||||||
}
|
}
|
||||||
if (controller->stopped) {
|
if (controller->stopped) {
|
||||||
|
@ -90,10 +109,9 @@ run_controller(void *data) {
|
||||||
sc_mutex_unlock(&controller->mutex);
|
sc_mutex_unlock(&controller->mutex);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
struct sc_control_msg msg;
|
|
||||||
bool non_empty = cbuf_take(&controller->queue, &msg);
|
assert(!sc_vecdeque_is_empty(&controller->queue));
|
||||||
assert(non_empty);
|
struct sc_control_msg msg = sc_vecdeque_pop(&controller->queue);
|
||||||
(void) non_empty;
|
|
||||||
sc_mutex_unlock(&controller->mutex);
|
sc_mutex_unlock(&controller->mutex);
|
||||||
|
|
||||||
bool ok = process_msg(controller, &msg);
|
bool ok = process_msg(controller, &msg);
|
||||||
|
|
|
@ -8,11 +8,11 @@
|
||||||
#include "control_msg.h"
|
#include "control_msg.h"
|
||||||
#include "receiver.h"
|
#include "receiver.h"
|
||||||
#include "util/acksync.h"
|
#include "util/acksync.h"
|
||||||
#include "util/cbuf.h"
|
|
||||||
#include "util/net.h"
|
#include "util/net.h"
|
||||||
#include "util/thread.h"
|
#include "util/thread.h"
|
||||||
|
#include "util/vecdeque.h"
|
||||||
|
|
||||||
struct sc_control_msg_queue CBUF(struct sc_control_msg, 64);
|
struct sc_control_msg_queue SC_VECDEQUE(struct sc_control_msg);
|
||||||
|
|
||||||
struct sc_controller {
|
struct sc_controller {
|
||||||
sc_socket control_socket;
|
sc_socket control_socket;
|
||||||
|
|
Loading…
Reference in a new issue