diff --git a/app/meson.build b/app/meson.build index 4c4c807b..5942fd08 100644 --- a/app/meson.build +++ b/app/meson.build @@ -17,6 +17,7 @@ src = [ 'src/server.c', 'src/str_util.c', 'src/tiny_xpm.c', + 'src/stream.c', 'src/video_buffer.c', ] diff --git a/app/src/decoder.c b/app/src/decoder.c index c2008834..ef375fad 100644 --- a/app/src/decoder.c +++ b/app/src/decoder.c @@ -17,121 +17,6 @@ #include "recorder.h" #include "video_buffer.h" -#define BUFSIZE 0x10000 - -#define HEADER_SIZE 12 -#define NO_PTS UINT64_C(-1) - -static struct frame_meta *frame_meta_new(uint64_t pts) { - struct frame_meta *meta = malloc(sizeof(*meta)); - if (!meta) { - return meta; - } - meta->pts = pts; - meta->next = NULL; - return meta; -} - -static void frame_meta_delete(struct frame_meta *frame_meta) { - free(frame_meta); -} - -static SDL_bool receiver_state_push_meta(struct receiver_state *state, - uint64_t pts) { - struct frame_meta *frame_meta = frame_meta_new(pts); - if (!frame_meta) { - return SDL_FALSE; - } - - // append to the list - // (iterate to find the last item, in practice the list should be tiny) - struct frame_meta **p = &state->frame_meta_queue; - while (*p) { - p = &(*p)->next; - } - *p = frame_meta; - return SDL_TRUE; -} - -static uint64_t receiver_state_take_meta(struct receiver_state *state) { - struct frame_meta *frame_meta = state->frame_meta_queue; // first item - SDL_assert(frame_meta); // must not be empty - uint64_t pts = frame_meta->pts; - state->frame_meta_queue = frame_meta->next; // remove the item - frame_meta_delete(frame_meta); - return pts; -} - -static int read_packet_with_meta(void *opaque, uint8_t *buf, int buf_size) { - struct decoder *decoder = opaque; - struct receiver_state *state = &decoder->receiver_state; - - // The video stream contains raw packets, without time information. When we - // record, we retrieve the timestamps separately, from a "meta" header - // added by the server before each raw packet. - // - // The "meta" header length is 12 bytes: - // [. . . . . . . .|. . . .]. . . . . . . . . . . . . . . ... - // <-------------> <-----> <-----------------------------... - // PTS packet raw packet - // size - // - // It is followed by bytes containing the packet/frame. - - if (!state->remaining) { -#define HEADER_SIZE 12 - uint8_t header[HEADER_SIZE]; - ssize_t r = net_recv_all(decoder->video_socket, header, HEADER_SIZE); - if (r == -1) { - return AVERROR(errno); - } - if (r == 0) { - return AVERROR_EOF; - } - // no partial read (net_recv_all()) - SDL_assert_release(r == HEADER_SIZE); - - uint64_t pts = buffer_read64be(header); - state->remaining = buffer_read32be(&header[8]); - - if (pts != NO_PTS && !receiver_state_push_meta(state, pts)) { - LOGE("Could not store PTS for recording"); - // we cannot save the PTS, the recording would be broken - return AVERROR(ENOMEM); - } - } - - SDL_assert(state->remaining); - - if (buf_size > state->remaining) - buf_size = state->remaining; - - ssize_t r = net_recv(decoder->video_socket, buf, buf_size); - if (r == -1) { - return AVERROR(errno); - } - if (r == 0) { - return AVERROR_EOF; - } - - SDL_assert(state->remaining >= r); - state->remaining -= r; - - return r; -} - -static int read_raw_packet(void *opaque, uint8_t *buf, int buf_size) { - struct decoder *decoder = opaque; - ssize_t r = net_recv(decoder->video_socket, buf, buf_size); - if (r == -1) { - return AVERROR(errno); - } - if (r == 0) { - return AVERROR_EOF; - } - return r; -} - // set the decoded frame as ready for rendering, and notify static void push_frame(struct decoder *decoder) { SDL_bool previous_frame_consumed = video_buffer_offer_decoded_frame(decoder->video_buffer); @@ -145,177 +30,66 @@ static void push_frame(struct decoder *decoder) { SDL_PushEvent(&new_frame_event); } -static void notify_stopped(void) { - SDL_Event stop_event; - stop_event.type = EVENT_DECODER_STOPPED; - SDL_PushEvent(&stop_event); -} - -static int run_decoder(void *data) { - struct decoder *decoder = data; - - AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_H264); - if (!codec) { - LOGE("H.264 decoder not found"); - goto run_end; - } - - AVCodecContext *codec_ctx = avcodec_alloc_context3(codec); - if (!codec_ctx) { - LOGC("Could not allocate decoder context"); - goto run_end; - } - - if (avcodec_open2(codec_ctx, codec, NULL) < 0) { - LOGE("Could not open H.264 codec"); - goto run_finally_free_codec_ctx; - } - - AVFormatContext *format_ctx = avformat_alloc_context(); - if (!format_ctx) { - LOGC("Could not allocate format context"); - goto run_finally_close_codec; - } - - unsigned char *buffer = av_malloc(BUFSIZE); - if (!buffer) { - LOGC("Could not allocate buffer"); - goto run_finally_free_format_ctx; - } - - // initialize the receiver state - decoder->receiver_state.frame_meta_queue = NULL; - decoder->receiver_state.remaining = 0; - - // if recording is enabled, a "header" is sent between raw packets - int (*read_packet)(void *, uint8_t *, int) = - decoder->recorder ? read_packet_with_meta : read_raw_packet; - AVIOContext *avio_ctx = avio_alloc_context(buffer, BUFSIZE, 0, decoder, - read_packet, NULL, NULL); - if (!avio_ctx) { - LOGC("Could not allocate avio context"); - // avformat_open_input takes ownership of 'buffer' - // so only free the buffer before avformat_open_input() - av_free(buffer); - goto run_finally_free_format_ctx; - } - - format_ctx->pb = avio_ctx; - - if (avformat_open_input(&format_ctx, NULL, NULL, NULL) < 0) { - LOGE("Could not open video stream"); - goto run_finally_free_avio_ctx; - } - - if (decoder->recorder && - !recorder_open(decoder->recorder, codec)) { - LOGE("Could not open recorder"); - goto run_finally_close_input; - } - - AVPacket packet; - av_init_packet(&packet); - packet.data = NULL; - packet.size = 0; - - while (!av_read_frame(format_ctx, &packet)) { -// the new decoding/encoding API has been introduced by: -// -#ifdef SCRCPY_LAVF_HAS_NEW_ENCODING_DECODING_API - int ret; - if ((ret = avcodec_send_packet(codec_ctx, &packet)) < 0) { - LOGE("Could not send video packet: %d", ret); - goto run_quit; - } - ret = avcodec_receive_frame(codec_ctx, decoder->video_buffer->decoding_frame); - if (!ret) { - // a frame was received - push_frame(decoder); - } else if (ret != AVERROR(EAGAIN)) { - LOGE("Could not receive video frame: %d", ret); - av_packet_unref(&packet); - goto run_quit; - } -#else - int got_picture; - int len = avcodec_decode_video2(codec_ctx, decoder->video_buffer->decoding_frame, &got_picture, &packet); - if (len < 0) { - LOGE("Could not decode video packet: %d", len); - av_packet_unref(&packet); - goto run_quit; - } - if (got_picture) { - push_frame(decoder); - } -#endif - - if (decoder->recorder) { - // we retrieve the PTS in order they were received, so they will - // be assigned to the correct frame - uint64_t pts = receiver_state_take_meta(&decoder->receiver_state); - packet.pts = pts; - packet.dts = pts; - - // no need to rescale with av_packet_rescale_ts(), the timestamps - // are in microseconds both in input and output - if (!recorder_write(decoder->recorder, &packet)) { - LOGE("Could not write frame to output file"); - av_packet_unref(&packet); - goto run_quit; - } - } - - av_packet_unref(&packet); - - if (avio_ctx->eof_reached) { - break; - } - } - - LOGD("End of frames"); - -run_quit: - if (decoder->recorder) { - recorder_close(decoder->recorder); - } -run_finally_close_input: - avformat_close_input(&format_ctx); -run_finally_free_avio_ctx: - av_free(avio_ctx->buffer); - av_free(avio_ctx); -run_finally_free_format_ctx: - avformat_free_context(format_ctx); -run_finally_close_codec: - avcodec_close(codec_ctx); -run_finally_free_codec_ctx: - avcodec_free_context(&codec_ctx); - notify_stopped(); -run_end: - return 0; -} - -void decoder_init(struct decoder *decoder, struct video_buffer *vb, - socket_t video_socket, struct recorder *recorder) { +void decoder_init(struct decoder *decoder, struct video_buffer *vb) { decoder->video_buffer = vb; - decoder->video_socket = video_socket; - decoder->recorder = recorder; } -SDL_bool decoder_start(struct decoder *decoder) { - LOGD("Starting decoder thread"); - - decoder->thread = SDL_CreateThread(run_decoder, "video_decoder", decoder); - if (!decoder->thread) { - LOGC("Could not start decoder thread"); +SDL_bool decoder_open(struct decoder *decoder, AVCodec *codec) { + decoder->codec_ctx = avcodec_alloc_context3(codec); + if (!decoder->codec_ctx) { + LOGC("Could not allocate decoder context"); return SDL_FALSE; } + + if (avcodec_open2(decoder->codec_ctx, codec, NULL) < 0) { + LOGE("Could not open codec"); + avcodec_free_context(&decoder->codec_ctx); + return SDL_FALSE; + } + return SDL_TRUE; } -void decoder_stop(struct decoder *decoder) { - video_buffer_interrupt(decoder->video_buffer); +void decoder_close(struct decoder *decoder) { + avcodec_close(decoder->codec_ctx); + avcodec_free_context(&decoder->codec_ctx); } -void decoder_join(struct decoder *decoder) { - SDL_WaitThread(decoder->thread, NULL); +SDL_bool decoder_push(struct decoder *decoder, AVPacket *packet) { +// the new decoding/encoding API has been introduced by: +// +#ifdef SCRCPY_LAVF_HAS_NEW_ENCODING_DECODING_API + int ret; + if ((ret = avcodec_send_packet(decoder->codec_ctx, packet)) < 0) { + LOGE("Could not send video packet: %d", ret); + return SDL_FALSE; + } + ret = avcodec_receive_frame(decoder->codec_ctx, + decoder->video_buffer->decoding_frame); + if (!ret) { + // a frame was received + push_frame(decoder); + } else if (ret != AVERROR(EAGAIN)) { + LOGE("Could not receive video frame: %d", ret); + return SDL_FALSE; + } +#else + int got_picture; + int len = avcodec_decode_video2(decoder->codec_ctx, + decoder->video_buffer->decoding_frame, + &got_picture, + packet); + if (len < 0) { + LOGE("Could not decode video packet: %d", len); + return SDL_FALSE; + } + if (got_picture) { + push_frame(decoder); + } +#endif + return SDL_TRUE; +} + +void decoder_interrupt(struct decoder *decoder) { + video_buffer_interrupt(decoder->video_buffer); } diff --git a/app/src/decoder.h b/app/src/decoder.h index 240dab00..ac8308c7 100644 --- a/app/src/decoder.h +++ b/app/src/decoder.h @@ -1,35 +1,22 @@ #ifndef DECODER_H #define DECODER_H +#include #include -#include - -#include "common.h" -#include "net.h" struct video_buffer; -struct frame_meta { - uint64_t pts; - struct frame_meta *next; -}; - struct decoder { struct video_buffer *video_buffer; - socket_t video_socket; - SDL_Thread *thread; - struct recorder *recorder; - struct receiver_state { - // meta (in order) for frames not consumed yet - struct frame_meta *frame_meta_queue; - size_t remaining; // remaining bytes to receive for the current frame - } receiver_state; + AVCodecContext *codec_ctx; }; -void decoder_init(struct decoder *decoder, struct video_buffer *vb, - socket_t video_socket, struct recorder *recoder); -SDL_bool decoder_start(struct decoder *decoder); -void decoder_stop(struct decoder *decoder); -void decoder_join(struct decoder *decoder); +void decoder_init(struct decoder *decoder, struct video_buffer *vb); + +SDL_bool decoder_open(struct decoder *decoder, AVCodec *codec); +void decoder_close(struct decoder *decoder); + +SDL_bool decoder_push(struct decoder *decoder, AVPacket *packet); +void decoder_interrupt(struct decoder *decoder); #endif diff --git a/app/src/events.h b/app/src/events.h index ff0c1a05..e9512048 100644 --- a/app/src/events.h +++ b/app/src/events.h @@ -1,3 +1,3 @@ #define EVENT_NEW_SESSION SDL_USEREVENT #define EVENT_NEW_FRAME (SDL_USEREVENT + 1) -#define EVENT_DECODER_STOPPED (SDL_USEREVENT + 2) +#define EVENT_STREAM_STOPPED (SDL_USEREVENT + 2) diff --git a/app/src/scrcpy.c b/app/src/scrcpy.c index 6880ee90..2bb9b280 100644 --- a/app/src/scrcpy.c +++ b/app/src/scrcpy.c @@ -22,16 +22,18 @@ #include "recorder.h" #include "screen.h" #include "server.h" +#include "stream.h" #include "tiny_xpm.h" #include "video_buffer.h" static struct server server = SERVER_INITIALIZER; static struct screen screen = SCREEN_INITIALIZER; static struct video_buffer video_buffer; +static struct stream stream; static struct decoder decoder; +static struct recorder recorder; static struct controller controller; static struct file_handler file_handler; -static struct recorder recorder; static struct input_manager input_manager = { .controller = &controller, @@ -70,8 +72,8 @@ static SDL_bool event_loop(void) { SDL_Event event; while (SDL_WaitEvent(&event)) { switch (event.type) { - case EVENT_DECODER_STOPPED: - LOGD("Video decoder stopped"); + case EVENT_STREAM_STOPPED: + LOGD("Video stream stopped"); return SDL_FALSE; case SDL_QUIT: LOGD("User requested to quit"); @@ -227,6 +229,8 @@ SDL_bool scrcpy(const struct scrcpy_options *options) { goto finally_destroy_video_buffer; } + decoder_init(&decoder, &video_buffer); + struct recorder *rec = NULL; if (record) { if (!recorder_init(&recorder, @@ -242,11 +246,11 @@ SDL_bool scrcpy(const struct scrcpy_options *options) { av_log_set_callback(av_log_callback); - decoder_init(&decoder, &video_buffer, device_socket, rec); + stream_init(&stream, device_socket, &decoder, rec); // now we consumed the header values, the socket receives the video stream - // start the decoder - if (!decoder_start(&decoder)) { + // start the stream + if (!stream_start(&stream)) { ret = SDL_FALSE; server_stop(&server); goto finally_destroy_recorder; @@ -254,7 +258,7 @@ SDL_bool scrcpy(const struct scrcpy_options *options) { if (!controller_init(&controller, device_socket)) { ret = SDL_FALSE; - goto finally_stop_decoder; + goto finally_stop_stream; } if (!controller_start(&controller)) { @@ -286,11 +290,11 @@ finally_stop_and_join_controller: controller_join(&controller); finally_destroy_controller: controller_destroy(&controller); -finally_stop_decoder: - decoder_stop(&decoder); - // stop the server before decoder_join() to wake up the decoder +finally_stop_stream: + stream_stop(&stream); + // stop the server before stream_join() to wake up the stream server_stop(&server); - decoder_join(&decoder); + stream_join(&stream); finally_destroy_recorder: if (record) { recorder_destroy(&recorder); diff --git a/app/src/stream.c b/app/src/stream.c new file mode 100644 index 00000000..73bb95d1 --- /dev/null +++ b/app/src/stream.c @@ -0,0 +1,275 @@ +#include "stream.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "compat.h" +#include "config.h" +#include "buffer_util.h" +#include "decoder.h" +#include "events.h" +#include "lock_util.h" +#include "log.h" +#include "recorder.h" + +#define BUFSIZE 0x10000 + +#define HEADER_SIZE 12 +#define NO_PTS UINT64_C(-1) + +static struct frame_meta *frame_meta_new(Uint64 pts) { + struct frame_meta *meta = malloc(sizeof(*meta)); + if (!meta) { + return meta; + } + meta->pts = pts; + meta->next = NULL; + return meta; +} + +static void frame_meta_delete(struct frame_meta *frame_meta) { + free(frame_meta); +} + +static SDL_bool receiver_state_push_meta(struct receiver_state *state, + uint64_t pts) { + struct frame_meta *frame_meta = frame_meta_new(pts); + if (!frame_meta) { + return SDL_FALSE; + } + + // append to the list + // (iterate to find the last item, in practice the list should be tiny) + struct frame_meta **p = &state->frame_meta_queue; + while (*p) { + p = &(*p)->next; + } + *p = frame_meta; + return SDL_TRUE; +} + +static uint64_t receiver_state_take_meta(struct receiver_state *state) { + struct frame_meta *frame_meta = state->frame_meta_queue; // first item + SDL_assert(frame_meta); // must not be empty + uint64_t pts = frame_meta->pts; + state->frame_meta_queue = frame_meta->next; // remove the item + frame_meta_delete(frame_meta); + return pts; +} + +static int read_packet_with_meta(void *opaque, uint8_t *buf, int buf_size) { + struct stream *stream = opaque; + struct receiver_state *state = &stream->receiver_state; + + // The video stream contains raw packets, without time information. When we + // record, we retrieve the timestamps separately, from a "meta" header + // added by the server before each raw packet. + // + // The "meta" header length is 12 bytes: + // [. . . . . . . .|. . . .]. . . . . . . . . . . . . . . ... + // <-------------> <-----> <-----------------------------... + // PTS packet raw packet + // size + // + // It is followed by bytes containing the packet/frame. + + if (!state->remaining) { +#define HEADER_SIZE 12 + uint8_t header[HEADER_SIZE]; + ssize_t r = net_recv_all(stream->socket, header, HEADER_SIZE); + if (r == -1) { + return AVERROR(errno); + } + if (r == 0) { + return AVERROR_EOF; + } + // no partial read (net_recv_all()) + SDL_assert_release(r == HEADER_SIZE); + + uint64_t pts = buffer_read64be(header); + state->remaining = buffer_read32be(&header[8]); + + if (pts != NO_PTS && !receiver_state_push_meta(state, pts)) { + LOGE("Could not store PTS for recording"); + // we cannot save the PTS, the recording would be broken + return AVERROR(ENOMEM); + } + } + + SDL_assert(state->remaining); + + if (buf_size > state->remaining) { + buf_size = state->remaining; + } + + ssize_t r = net_recv(stream->socket, buf, buf_size); + if (r == -1) { + return AVERROR(errno); + } + if (r == 0) { + return AVERROR_EOF; + } + + SDL_assert(state->remaining >= r); + state->remaining -= r; + + return r; +} + +static int read_raw_packet(void *opaque, uint8_t *buf, int buf_size) { + struct stream *stream = opaque; + ssize_t r = net_recv(stream->socket, buf, buf_size); + if (r == -1) { + return AVERROR(errno); + } + if (r == 0) { + return AVERROR_EOF; + } + return r; +} + +static void notify_stopped(void) { + SDL_Event stop_event; + stop_event.type = EVENT_STREAM_STOPPED; + SDL_PushEvent(&stop_event); +} + +static int run_stream(void *data) { + struct stream *stream = data; + + AVFormatContext *format_ctx = avformat_alloc_context(); + if (!format_ctx) { + LOGC("Could not allocate format context"); + goto end; + } + + unsigned char *buffer = av_malloc(BUFSIZE); + if (!buffer) { + LOGC("Could not allocate buffer"); + goto finally_free_format_ctx; + } + + // initialize the receiver state + stream->receiver_state.frame_meta_queue = NULL; + stream->receiver_state.remaining = 0; + + // if recording is enabled, a "header" is sent between raw packets + int (*read_packet)(void *, uint8_t *, int) = + stream->recorder ? read_packet_with_meta : read_raw_packet; + AVIOContext *avio_ctx = avio_alloc_context(buffer, BUFSIZE, 0, stream, + read_packet, NULL, NULL); + if (!avio_ctx) { + LOGC("Could not allocate avio context"); + // avformat_open_input takes ownership of 'buffer' + // so only free the buffer before avformat_open_input() + av_free(buffer); + goto finally_free_format_ctx; + } + + format_ctx->pb = avio_ctx; + + if (avformat_open_input(&format_ctx, NULL, NULL, NULL) < 0) { + LOGE("Could not open video stream"); + goto finally_free_avio_ctx; + } + + AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_H264); + if (!codec) { + LOGE("H.264 decoder not found"); + goto end; + } + + if (stream->decoder && !decoder_open(stream->decoder, codec)) { + LOGE("Could not open decoder"); + goto finally_close_input; + } + + if (stream->recorder && !recorder_open(stream->recorder, codec)) { + LOGE("Could not open recorder"); + goto finally_close_input; + } + + AVPacket packet; + av_init_packet(&packet); + packet.data = NULL; + packet.size = 0; + + while (!av_read_frame(format_ctx, &packet)) { + if (stream->decoder && !decoder_push(stream->decoder, &packet)) { + av_packet_unref(&packet); + goto quit; + } + + if (stream->recorder) { + // we retrieve the PTS in order they were received, so they will + // be assigned to the correct frame + Uint64 pts = receiver_state_take_meta(&stream->receiver_state); + packet.pts = pts; + packet.dts = pts; + + // no need to rescale with av_packet_rescale_ts(), the timestamps + // are in microseconds both in input and output + if (!recorder_write(stream->recorder, &packet)) { + LOGE("Could not write frame to output file"); + av_packet_unref(&packet); + goto quit; + } + } + + av_packet_unref(&packet); + + if (avio_ctx->eof_reached) { + break; + } + } + + LOGD("End of frames"); + +quit: + if (stream->recorder) { + recorder_close(stream->recorder); + } +finally_close_input: + avformat_close_input(&format_ctx); +finally_free_avio_ctx: + av_free(avio_ctx->buffer); + av_free(avio_ctx); +finally_free_format_ctx: + avformat_free_context(format_ctx); +end: + notify_stopped(); + return 0; +} + +void stream_init(struct stream *stream, socket_t socket, + struct decoder *decoder, struct recorder *recorder) { + stream->socket = socket; + stream->decoder = decoder, + stream->recorder = recorder; +} + +SDL_bool stream_start(struct stream *stream) { + LOGD("Starting stream thread"); + + stream->thread = SDL_CreateThread(run_stream, "stream", stream); + if (!stream->thread) { + LOGC("Could not start stream thread"); + return SDL_FALSE; + } + return SDL_TRUE; +} + +void stream_stop(struct stream *stream) { + if (stream->decoder) { + decoder_interrupt(stream->decoder); + } +} + +void stream_join(struct stream *stream) { + SDL_WaitThread(stream->thread, NULL); +} diff --git a/app/src/stream.h b/app/src/stream.h new file mode 100644 index 00000000..ef78b321 --- /dev/null +++ b/app/src/stream.h @@ -0,0 +1,35 @@ +#ifndef STREAM_H +#define STREAM_H + +#include +#include + +#include "net.h" + +struct video_buffer; + +struct frame_meta { + Uint64 pts; + struct frame_meta *next; +}; + +struct stream { + socket_t socket; + struct video_buffer *video_buffer; + SDL_Thread *thread; + struct decoder *decoder; + struct recorder *recorder; + struct receiver_state { + // meta (in order) for frames not consumed yet + struct frame_meta *frame_meta_queue; + size_t remaining; // remaining bytes to receive for the current frame + } receiver_state; +}; + +void stream_init(struct stream *stream, socket_t socket, + struct decoder *decoder, struct recorder *recorder); +SDL_bool stream_start(struct stream *stream); +void stream_stop(struct stream *stream); +void stream_join(struct stream *stream); + +#endif