From 7dec225cebbf208cf1d607bd45fb49e3a9fa1167 Mon Sep 17 00:00:00 2001 From: Romain Vimont Date: Wed, 2 Feb 2022 20:51:07 +0100 Subject: [PATCH] Rename stream to sc_demuxer For consistency with recorder and decoder, name the component which demuxes a "demuxer". And add the missing sc_ prefix. --- app/meson.build | 2 +- app/src/{stream.c => demuxer.c} | 142 ++++++++++++++++---------------- app/src/demuxer.h | 51 ++++++++++++ app/src/scrcpy.c | 38 ++++----- 4 files changed, 142 insertions(+), 91 deletions(-) rename app/src/{stream.c => demuxer.c} (55%) create mode 100644 app/src/demuxer.h diff --git a/app/meson.build b/app/meson.build index 7f675035..1d31a442 100644 --- a/app/meson.build +++ b/app/meson.build @@ -9,6 +9,7 @@ src = [ 'src/control_msg.c', 'src/controller.c', 'src/decoder.c', + 'src/demuxer.c', 'src/device_msg.c', 'src/icon.c', 'src/file_pusher.c', @@ -24,7 +25,6 @@ src = [ 'src/scrcpy.c', 'src/screen.c', 'src/server.c', - 'src/stream.c', 'src/video_buffer.c', 'src/util/acksync.c', 'src/util/file.c', diff --git a/app/src/stream.c b/app/src/demuxer.c similarity index 55% rename from app/src/stream.c rename to app/src/demuxer.c index c873c4ad..3dd62491 100644 --- a/app/src/stream.c +++ b/app/src/demuxer.c @@ -1,4 +1,4 @@ -#include "stream.h" +#include "demuxer.h" #include #include @@ -16,7 +16,7 @@ #define NO_PTS UINT64_C(-1) static bool -stream_recv_packet(struct stream *stream, AVPacket *packet) { +sc_demuxer_recv_packet(struct sc_demuxer *demuxer, AVPacket *packet) { // The video stream contains raw packets, without time information. When we // record, we retrieve the timestamps separately, from a "meta" header // added by the server before each raw packet. @@ -30,7 +30,7 @@ stream_recv_packet(struct stream *stream, AVPacket *packet) { // It is followed by bytes containing the packet/frame. uint8_t header[HEADER_SIZE]; - ssize_t r = net_recv_all(stream->socket, header, HEADER_SIZE); + ssize_t r = net_recv_all(demuxer->socket, header, HEADER_SIZE); if (r < HEADER_SIZE) { return false; } @@ -45,7 +45,7 @@ stream_recv_packet(struct stream *stream, AVPacket *packet) { return false; } - r = net_recv_all(stream->socket, packet->data, len); + r = net_recv_all(demuxer->socket, packet->data, len); if (r < 0 || ((uint32_t) r) < len) { av_packet_unref(packet); return false; @@ -57,9 +57,9 @@ stream_recv_packet(struct stream *stream, AVPacket *packet) { } static bool -push_packet_to_sinks(struct stream *stream, const AVPacket *packet) { - for (unsigned i = 0; i < stream->sink_count; ++i) { - struct sc_packet_sink *sink = stream->sinks[i]; +push_packet_to_sinks(struct sc_demuxer *demuxer, const AVPacket *packet) { + for (unsigned i = 0; i < demuxer->sink_count; ++i) { + struct sc_packet_sink *sink = demuxer->sinks[i]; if (!sink->ops->push(sink, packet)) { LOGE("Could not send config packet to sink %d", i); return false; @@ -70,12 +70,12 @@ push_packet_to_sinks(struct stream *stream, const AVPacket *packet) { } static bool -stream_parse(struct stream *stream, AVPacket *packet) { +sc_demuxer_parse(struct sc_demuxer *demuxer, AVPacket *packet) { uint8_t *in_data = packet->data; int in_len = packet->size; uint8_t *out_data = NULL; int out_len = 0; - int r = av_parser_parse2(stream->parser, stream->codec_ctx, + int r = av_parser_parse2(demuxer->parser, demuxer->codec_ctx, &out_data, &out_len, in_data, in_len, AV_NOPTS_VALUE, AV_NOPTS_VALUE, -1); @@ -84,13 +84,13 @@ stream_parse(struct stream *stream, AVPacket *packet) { (void) r; assert(out_len == in_len); - if (stream->parser->key_frame == 1) { + if (demuxer->parser->key_frame == 1) { packet->flags |= AV_PKT_FLAG_KEY; } packet->dts = packet->pts; - bool ok = push_packet_to_sinks(stream, packet); + bool ok = push_packet_to_sinks(demuxer, packet); if (!ok) { LOGE("Could not process packet"); return false; @@ -100,57 +100,57 @@ stream_parse(struct stream *stream, AVPacket *packet) { } static bool -stream_push_packet(struct stream *stream, AVPacket *packet) { +sc_demuxer_push_packet(struct sc_demuxer *demuxer, AVPacket *packet) { bool is_config = packet->pts == AV_NOPTS_VALUE; // A config packet must not be decoded immediately (it contains no // frame); instead, it must be concatenated with the future data packet. - if (stream->pending || is_config) { + if (demuxer->pending || is_config) { size_t offset; - if (stream->pending) { - offset = stream->pending->size; - if (av_grow_packet(stream->pending, packet->size)) { + if (demuxer->pending) { + offset = demuxer->pending->size; + if (av_grow_packet(demuxer->pending, packet->size)) { LOG_OOM(); return false; } } else { offset = 0; - stream->pending = av_packet_alloc(); - if (!stream->pending) { + demuxer->pending = av_packet_alloc(); + if (!demuxer->pending) { LOG_OOM(); return false; } - if (av_new_packet(stream->pending, packet->size)) { + if (av_new_packet(demuxer->pending, packet->size)) { LOG_OOM(); - av_packet_free(&stream->pending); + av_packet_free(&demuxer->pending); return false; } } - memcpy(stream->pending->data + offset, packet->data, packet->size); + memcpy(demuxer->pending->data + offset, packet->data, packet->size); if (!is_config) { // prepare the concat packet to send to the decoder - stream->pending->pts = packet->pts; - stream->pending->dts = packet->dts; - stream->pending->flags = packet->flags; - packet = stream->pending; + demuxer->pending->pts = packet->pts; + demuxer->pending->dts = packet->dts; + demuxer->pending->flags = packet->flags; + packet = demuxer->pending; } } if (is_config) { // config packet - bool ok = push_packet_to_sinks(stream, packet); + bool ok = push_packet_to_sinks(demuxer, packet); if (!ok) { return false; } } else { // data packet - bool ok = stream_parse(stream, packet); + bool ok = sc_demuxer_parse(demuxer, packet); - if (stream->pending) { + if (demuxer->pending) { // the pending packet must be discarded (consumed or error) - av_packet_free(&stream->pending); + av_packet_free(&demuxer->pending); } if (!ok) { @@ -161,25 +161,25 @@ stream_push_packet(struct stream *stream, AVPacket *packet) { } static void -stream_close_first_sinks(struct stream *stream, unsigned count) { +sc_demuxer_close_first_sinks(struct sc_demuxer *demuxer, unsigned count) { while (count) { - struct sc_packet_sink *sink = stream->sinks[--count]; + struct sc_packet_sink *sink = demuxer->sinks[--count]; sink->ops->close(sink); } } static inline void -stream_close_sinks(struct stream *stream) { - stream_close_first_sinks(stream, stream->sink_count); +sc_demuxer_close_sinks(struct sc_demuxer *demuxer) { + sc_demuxer_close_first_sinks(demuxer, demuxer->sink_count); } static bool -stream_open_sinks(struct stream *stream, const AVCodec *codec) { - for (unsigned i = 0; i < stream->sink_count; ++i) { - struct sc_packet_sink *sink = stream->sinks[i]; +sc_demuxer_open_sinks(struct sc_demuxer *demuxer, const AVCodec *codec) { + for (unsigned i = 0; i < demuxer->sink_count; ++i) { + struct sc_packet_sink *sink = demuxer->sinks[i]; if (!sink->ops->open(sink, codec)) { LOGE("Could not open packet sink %d", i); - stream_close_first_sinks(stream, i); + sc_demuxer_close_first_sinks(demuxer, i); return false; } } @@ -188,8 +188,8 @@ stream_open_sinks(struct stream *stream, const AVCodec *codec) { } static int -run_stream(void *data) { - struct stream *stream = data; +run_demuxer(void *data) { + struct sc_demuxer *demuxer = data; const AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_H264); if (!codec) { @@ -197,26 +197,26 @@ run_stream(void *data) { goto end; } - stream->codec_ctx = avcodec_alloc_context3(codec); - if (!stream->codec_ctx) { + demuxer->codec_ctx = avcodec_alloc_context3(codec); + if (!demuxer->codec_ctx) { LOG_OOM(); goto end; } - if (!stream_open_sinks(stream, codec)) { - LOGE("Could not open stream sinks"); + if (!sc_demuxer_open_sinks(demuxer, codec)) { + LOGE("Could not open demuxer sinks"); goto finally_free_codec_ctx; } - stream->parser = av_parser_init(AV_CODEC_ID_H264); - if (!stream->parser) { + demuxer->parser = av_parser_init(AV_CODEC_ID_H264); + if (!demuxer->parser) { LOGE("Could not initialize parser"); goto finally_close_sinks; } // We must only pass complete frames to av_parser_parse2()! // It's more complicated, but this allows to reduce the latency by 1 frame! - stream->parser->flags |= PARSER_FLAG_COMPLETE_FRAMES; + demuxer->parser->flags |= PARSER_FLAG_COMPLETE_FRAMES; AVPacket *packet = av_packet_alloc(); if (!packet) { @@ -225,13 +225,13 @@ run_stream(void *data) { } for (;;) { - bool ok = stream_recv_packet(stream, packet); + bool ok = sc_demuxer_recv_packet(demuxer, packet); if (!ok) { // end of stream break; } - ok = stream_push_packet(stream, packet); + ok = sc_demuxer_push_packet(demuxer, packet); av_packet_unref(packet); if (!ok) { // cannot process packet (error already logged) @@ -241,58 +241,58 @@ run_stream(void *data) { LOGD("End of frames"); - if (stream->pending) { - av_packet_free(&stream->pending); + if (demuxer->pending) { + av_packet_free(&demuxer->pending); } av_packet_free(&packet); finally_close_parser: - av_parser_close(stream->parser); + av_parser_close(demuxer->parser); finally_close_sinks: - stream_close_sinks(stream); + sc_demuxer_close_sinks(demuxer); finally_free_codec_ctx: - avcodec_free_context(&stream->codec_ctx); + avcodec_free_context(&demuxer->codec_ctx); end: - stream->cbs->on_eos(stream, stream->cbs_userdata); + demuxer->cbs->on_eos(demuxer, demuxer->cbs_userdata); return 0; } void -stream_init(struct stream *stream, sc_socket socket, - const struct stream_callbacks *cbs, void *cbs_userdata) { - stream->socket = socket; - stream->pending = NULL; - stream->sink_count = 0; +sc_demuxer_init(struct sc_demuxer *demuxer, sc_socket socket, + const struct sc_demuxer_callbacks *cbs, void *cbs_userdata) { + demuxer->socket = socket; + demuxer->pending = NULL; + demuxer->sink_count = 0; assert(cbs && cbs->on_eos); - stream->cbs = cbs; - stream->cbs_userdata = cbs_userdata; + demuxer->cbs = cbs; + demuxer->cbs_userdata = cbs_userdata; } void -stream_add_sink(struct stream *stream, struct sc_packet_sink *sink) { - assert(stream->sink_count < STREAM_MAX_SINKS); +sc_demuxer_add_sink(struct sc_demuxer *demuxer, struct sc_packet_sink *sink) { + assert(demuxer->sink_count < SC_DEMUXER_MAX_SINKS); assert(sink); assert(sink->ops); - stream->sinks[stream->sink_count++] = sink; + demuxer->sinks[demuxer->sink_count++] = sink; } bool -stream_start(struct stream *stream) { - LOGD("Starting stream thread"); +sc_demuxer_start(struct sc_demuxer *demuxer) { + LOGD("Starting demuxer thread"); - bool ok = - sc_thread_create(&stream->thread, run_stream, "scrcpy-stream", stream); + bool ok = sc_thread_create(&demuxer->thread, run_demuxer, "scrcpy-demuxer", + demuxer); if (!ok) { - LOGC("Could not start stream thread"); + LOGC("Could not start demuxer thread"); return false; } return true; } void -stream_join(struct stream *stream) { - sc_thread_join(&stream->thread, NULL); +sc_demuxer_join(struct sc_demuxer *demuxer) { + sc_thread_join(&demuxer->thread, NULL); } diff --git a/app/src/demuxer.h b/app/src/demuxer.h new file mode 100644 index 00000000..11e20ad6 --- /dev/null +++ b/app/src/demuxer.h @@ -0,0 +1,51 @@ +#ifndef SC_DEMUXER_H +#define SC_DEMUXER_H + +#include "common.h" + +#include +#include +#include +#include + +#include "trait/packet_sink.h" +#include "util/net.h" +#include "util/thread.h" + +#define SC_DEMUXER_MAX_SINKS 2 + +struct sc_demuxer { + sc_socket socket; + sc_thread thread; + + struct sc_packet_sink *sinks[SC_DEMUXER_MAX_SINKS]; + unsigned sink_count; + + AVCodecContext *codec_ctx; + AVCodecParserContext *parser; + // successive packets may need to be concatenated, until a non-config + // packet is available + AVPacket *pending; + + const struct sc_demuxer_callbacks *cbs; + void *cbs_userdata; +}; + +struct sc_demuxer_callbacks { + void (*on_eos)(struct sc_demuxer *demuxer, void *userdata); +}; + +void +sc_demuxer_init(struct sc_demuxer *demuxer, sc_socket socket, + const struct sc_demuxer_callbacks *cbs, void *cbs_userdata); + +void +sc_demuxer_add_sink(struct sc_demuxer *demuxer, struct sc_packet_sink *sink); + +bool +sc_demuxer_start(struct sc_demuxer *demuxer); + +void +sc_demuxer_join(struct sc_demuxer *demuxer); + +#endif diff --git a/app/src/scrcpy.c b/app/src/scrcpy.c index 91ee0a2a..270fe2b3 100644 --- a/app/src/scrcpy.c +++ b/app/src/scrcpy.c @@ -15,6 +15,7 @@ #include "controller.h" #include "decoder.h" +#include "demuxer.h" #include "events.h" #include "file_pusher.h" #include "keyboard_inject.h" @@ -22,7 +23,6 @@ #include "recorder.h" #include "screen.h" #include "server.h" -#include "stream.h" #ifdef HAVE_USB # include "usb/aoa_hid.h" # include "usb/hid_keyboard.h" @@ -39,7 +39,7 @@ struct scrcpy { struct sc_server server; struct sc_screen screen; - struct stream stream; + struct sc_demuxer demuxer; struct sc_decoder decoder; struct sc_recorder recorder; #ifdef HAVE_V4L2 @@ -231,8 +231,8 @@ av_log_callback(void *avcl, int level, const char *fmt, va_list vl) { } static void -stream_on_eos(struct stream *stream, void *userdata) { - (void) stream; +sc_demuxer_on_eos(struct sc_demuxer *demuxer, void *userdata) { + (void) demuxer; (void) userdata; PUSH_EVENT(EVENT_STREAM_STOPPED); @@ -285,7 +285,7 @@ scrcpy(struct scrcpy_options *options) { #ifdef HAVE_V4L2 bool v4l2_sink_initialized = false; #endif - bool stream_started = false; + bool demuxer_started = false; #ifdef HAVE_USB bool aoa_hid_initialized = false; bool hid_keyboard_initialized = false; @@ -395,17 +395,17 @@ scrcpy(struct scrcpy_options *options) { av_log_set_callback(av_log_callback); - static const struct stream_callbacks stream_cbs = { - .on_eos = stream_on_eos, + static const struct sc_demuxer_callbacks demuxer_cbs = { + .on_eos = sc_demuxer_on_eos, }; - stream_init(&s->stream, s->server.video_socket, &stream_cbs, NULL); + sc_demuxer_init(&s->demuxer, s->server.video_socket, &demuxer_cbs, NULL); if (dec) { - stream_add_sink(&s->stream, &dec->packet_sink); + sc_demuxer_add_sink(&s->demuxer, &dec->packet_sink); } if (rec) { - stream_add_sink(&s->stream, &rec->packet_sink); + sc_demuxer_add_sink(&s->demuxer, &rec->packet_sink); } struct sc_controller *controller = NULL; @@ -625,21 +625,21 @@ aoa_hid_end: #endif // now we consumed the header values, the socket receives the video stream - // start the stream - if (!stream_start(&s->stream)) { + // start the demuxer + if (!sc_demuxer_start(&s->demuxer)) { goto end; } - stream_started = true; + demuxer_started = true; ret = event_loop(s); LOGD("quit..."); // Close the window immediately on closing, because screen_destroy() may - // only be called once the stream thread is joined (it may take time) + // only be called once the demuxer thread is joined (it may take time) sc_screen_hide_window(&s->screen); end: - // The stream is not stopped explicitly, because it will stop by itself on + // The demuxer is not stopped explicitly, because it will stop by itself on // end-of-stream #ifdef HAVE_USB if (aoa_hid_initialized) { @@ -671,10 +671,10 @@ end: sc_server_stop(&s->server); } - // now that the sockets are shutdown, the stream and controller are + // now that the sockets are shutdown, the demuxer and controller are // interrupted, we can join them - if (stream_started) { - stream_join(&s->stream); + if (demuxer_started) { + sc_demuxer_join(&s->demuxer); } #ifdef HAVE_V4L2 @@ -693,7 +693,7 @@ end: } #endif - // Destroy the screen only after the stream is guaranteed to be finished, + // Destroy the screen only after the demuxer is guaranteed to be finished, // because otherwise the screen could receive new frames after destruction if (screen_initialized) { sc_screen_join(&s->screen);