Open recording file from the recorder thread

The recorder opened the target file from the packet sink open()
callback, called by the demuxer. Only then the recorder thread was
started.

One golden rule for the recorder is to never block the demuxer for I/O,
because it would impact mirroring. This rule is respected on recording
packets, but not for the initial recorder opening.

Therefore, start the recorder thread from sc_recorder_init(), open the
file immediately from the recorder thread, then make it wait for the
stream to start (on packet sink open()).

Now that the recorder can report errors directly (rather than making the
demuxer call fail), it is possible to report file opening error even
before the packet sink is open.
This commit is contained in:
Romain Vimont 2023-02-14 09:25:50 +01:00
parent 6b5dfef923
commit a039124d5d
4 changed files with 178 additions and 91 deletions

View file

@ -132,10 +132,76 @@ sc_recorder_write(struct sc_recorder *recorder, AVPacket *packet) {
return av_write_frame(recorder->ctx, packet) >= 0; return av_write_frame(recorder->ctx, packet) >= 0;
} }
static int static bool
run_recorder(void *data) { sc_recorder_open_output_file(struct sc_recorder *recorder) {
struct sc_recorder *recorder = data; const char *format_name = sc_recorder_get_format_name(recorder->format);
assert(format_name);
const AVOutputFormat *format = find_muxer(format_name);
if (!format) {
LOGE("Could not find muxer");
return false;
}
recorder->ctx = avformat_alloc_context();
if (!recorder->ctx) {
LOG_OOM();
return false;
}
int ret = avio_open(&recorder->ctx->pb, recorder->filename,
AVIO_FLAG_WRITE);
if (ret < 0) {
LOGE("Failed to open output file: %s", recorder->filename);
avformat_free_context(recorder->ctx);
return false;
}
// contrary to the deprecated API (av_oformat_next()), av_muxer_iterate()
// returns (on purpose) a pointer-to-const, but AVFormatContext.oformat
// still expects a pointer-to-non-const (it has not be updated accordingly)
// <https://github.com/FFmpeg/FFmpeg/commit/0694d8702421e7aff1340038559c438b61bb30dd>
recorder->ctx->oformat = (AVOutputFormat *) format;
av_dict_set(&recorder->ctx->metadata, "comment",
"Recorded by scrcpy " SCRCPY_VERSION, 0);
LOGI("Recording started to %s file: %s", format_name, recorder->filename);
return true;
}
static void
sc_recorder_close_output_file(struct sc_recorder *recorder) {
avio_close(recorder->ctx->pb);
avformat_free_context(recorder->ctx);
}
static bool
sc_recorder_wait_video_stream(struct sc_recorder *recorder) {
sc_mutex_lock(&recorder->mutex);
while (!recorder->codec && !recorder->stopped) {
sc_cond_wait(&recorder->stream_cond, &recorder->mutex);
}
const AVCodec *codec = recorder->codec;
sc_mutex_unlock(&recorder->mutex);
if (codec) {
AVStream *ostream = avformat_new_stream(recorder->ctx, codec);
if (!ostream) {
return false;
}
ostream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
ostream->codecpar->codec_id = codec->id;
ostream->codecpar->format = AV_PIX_FMT_YUV420P;
ostream->codecpar->width = recorder->declared_frame_size.width;
ostream->codecpar->height = recorder->declared_frame_size.height;
}
return true;
}
static bool
sc_recorder_process_packets(struct sc_recorder *recorder) {
int64_t pts_origin = AV_NOPTS_VALUE; int64_t pts_origin = AV_NOPTS_VALUE;
// We can write a packet only once we received the next one so that we can // We can write a packet only once we received the next one so that we can
@ -206,42 +272,70 @@ run_recorder(void *data) {
if (!ok) { if (!ok) {
LOGE("Could not record packet"); LOGE("Could not record packet");
sc_mutex_lock(&recorder->mutex); return false;
recorder->failed = true;
// discard pending packets
sc_recorder_queue_clear(&recorder->queue);
sc_mutex_unlock(&recorder->mutex);
break;
} }
previous = rec; previous = rec;
} }
if (!recorder->failed) { if (!recorder->header_written) {
if (recorder->header_written) { // the recorded file is empty
int ret = av_write_trailer(recorder->ctx); return false;
if (ret < 0) {
LOGE("Failed to write trailer to %s", recorder->filename);
recorder->failed = true;
}
} else {
// the recorded file is empty
recorder->failed = true;
}
} }
if (recorder->failed) { int ret = av_write_trailer(recorder->ctx);
LOGE("Recording failed to %s", recorder->filename); if (ret < 0) {
} else { LOGE("Failed to write trailer to %s", recorder->filename);
return false;
}
return true;
}
static bool
sc_recorder_record(struct sc_recorder *recorder) {
bool ok = sc_recorder_open_output_file(recorder);
if (!ok) {
return false;
}
ok = sc_recorder_wait_video_stream(recorder);
if (!ok) {
sc_recorder_close_output_file(recorder);
return false;
}
// If recorder->stopped, process any queued packet anyway
ok = sc_recorder_process_packets(recorder);
sc_recorder_close_output_file(recorder);
return ok;
}
static int
run_recorder(void *data) {
struct sc_recorder *recorder = data;
bool success = sc_recorder_record(recorder);
sc_mutex_lock(&recorder->mutex);
// Prevent the producer to push any new packet
recorder->stopped = true;
// Discard pending packets
sc_recorder_queue_clear(&recorder->queue);
sc_mutex_unlock(&recorder->mutex);
if (success) {
const char *format_name = sc_recorder_get_format_name(recorder->format); const char *format_name = sc_recorder_get_format_name(recorder->format);
LOGI("Recording complete to %s file: %s", format_name, LOGI("Recording complete to %s file: %s", format_name,
recorder->filename); recorder->filename);
} else {
LOGE("Recording failed to %s", recorder->filename);
} }
LOGD("Recorder thread ended"); LOGD("Recorder thread ended");
recorder->cbs->on_ended(recorder, !recorder->failed, recorder->cbs->on_ended(recorder, success, recorder->cbs_userdata);
recorder->cbs_userdata);
return 0; return 0;
} }
@ -252,66 +346,17 @@ sc_recorder_packet_sink_open(struct sc_packet_sink *sink,
struct sc_recorder *recorder = DOWNCAST(sink); struct sc_recorder *recorder = DOWNCAST(sink);
assert(codec); assert(codec);
const char *format_name = sc_recorder_get_format_name(recorder->format); sc_mutex_lock(&recorder->mutex);
assert(format_name); if (recorder->stopped) {
const AVOutputFormat *format = find_muxer(format_name); sc_mutex_unlock(&recorder->mutex);
if (!format) {
LOGE("Could not find muxer");
return false; return false;
} }
recorder->ctx = avformat_alloc_context(); recorder->codec = codec;
if (!recorder->ctx) { sc_cond_signal(&recorder->stream_cond);
LOG_OOM(); sc_mutex_unlock(&recorder->mutex);
return false;
}
// contrary to the deprecated API (av_oformat_next()), av_muxer_iterate()
// returns (on purpose) a pointer-to-const, but AVFormatContext.oformat
// still expects a pointer-to-non-const (it has not be updated accordingly)
// <https://github.com/FFmpeg/FFmpeg/commit/0694d8702421e7aff1340038559c438b61bb30dd>
recorder->ctx->oformat = (AVOutputFormat *) format;
av_dict_set(&recorder->ctx->metadata, "comment",
"Recorded by scrcpy " SCRCPY_VERSION, 0);
AVStream *ostream = avformat_new_stream(recorder->ctx, codec);
if (!ostream) {
goto error_avformat_free_context;
}
ostream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
ostream->codecpar->codec_id = codec->id;
ostream->codecpar->format = AV_PIX_FMT_YUV420P;
ostream->codecpar->width = recorder->declared_frame_size.width;
ostream->codecpar->height = recorder->declared_frame_size.height;
int ret = avio_open(&recorder->ctx->pb, recorder->filename,
AVIO_FLAG_WRITE);
if (ret < 0) {
LOGE("Failed to open output file: %s", recorder->filename);
// ostream will be cleaned up during context cleaning
goto error_avformat_free_context;
}
LOGD("Starting recorder thread");
bool ok = sc_thread_create(&recorder->thread, run_recorder,
"scrcpy-recorder", recorder);
if (!ok) {
LOGE("Could not start recorder thread");
goto error_avio_close;
}
LOGI("Recording started to %s file: %s", format_name, recorder->filename);
return true; return true;
error_avio_close:
avio_close(recorder->ctx->pb);
error_avformat_free_context:
avformat_free_context(recorder->ctx);
return false;
} }
static void static void
@ -319,14 +364,10 @@ sc_recorder_packet_sink_close(struct sc_packet_sink *sink) {
struct sc_recorder *recorder = DOWNCAST(sink); struct sc_recorder *recorder = DOWNCAST(sink);
sc_mutex_lock(&recorder->mutex); sc_mutex_lock(&recorder->mutex);
// EOS also stops the recorder
recorder->stopped = true; recorder->stopped = true;
sc_cond_signal(&recorder->queue_cond); sc_cond_signal(&recorder->queue_cond);
sc_mutex_unlock(&recorder->mutex); sc_mutex_unlock(&recorder->mutex);
sc_thread_join(&recorder->thread, NULL);
avio_close(recorder->ctx->pb);
avformat_free_context(recorder->ctx);
} }
static bool static bool
@ -335,10 +376,9 @@ sc_recorder_packet_sink_push(struct sc_packet_sink *sink,
struct sc_recorder *recorder = DOWNCAST(sink); struct sc_recorder *recorder = DOWNCAST(sink);
sc_mutex_lock(&recorder->mutex); sc_mutex_lock(&recorder->mutex);
assert(!recorder->stopped);
if (recorder->failed) { if (recorder->stopped) {
// reject any new packet (this will stop the stream) // reject any new packet
sc_mutex_unlock(&recorder->mutex); sc_mutex_unlock(&recorder->mutex);
return false; return false;
} }
@ -378,11 +418,17 @@ sc_recorder_init(struct sc_recorder *recorder, const char *filename,
goto error_mutex_destroy; goto error_mutex_destroy;
} }
ok = sc_cond_init(&recorder->stream_cond);
if (!ok) {
goto error_queue_cond_destroy;
}
sc_queue_init(&recorder->queue); sc_queue_init(&recorder->queue);
recorder->stopped = false; recorder->stopped = false;
recorder->failed = false;
recorder->header_written = false; recorder->header_written = false;
recorder->codec = NULL;
recorder->format = format; recorder->format = format;
recorder->declared_frame_size = declared_frame_size; recorder->declared_frame_size = declared_frame_size;
@ -398,8 +444,19 @@ sc_recorder_init(struct sc_recorder *recorder, const char *filename,
recorder->packet_sink.ops = &ops; recorder->packet_sink.ops = &ops;
ok = sc_thread_create(&recorder->thread, run_recorder, "scrcpy-recorder",
recorder);
if (!ok) {
LOGE("Could not start recorder thread");
goto error_stream_cond_destroy;
}
return true; return true;
error_stream_cond_destroy:
sc_cond_destroy(&recorder->stream_cond);
error_queue_cond_destroy:
sc_cond_destroy(&recorder->queue_cond);
error_mutex_destroy: error_mutex_destroy:
sc_mutex_destroy(&recorder->mutex); sc_mutex_destroy(&recorder->mutex);
error_free_filename: error_free_filename:
@ -408,8 +465,23 @@ error_free_filename:
return false; return false;
} }
void
sc_recorder_stop(struct sc_recorder *recorder) {
sc_mutex_lock(&recorder->mutex);
recorder->stopped = true;
sc_cond_signal(&recorder->queue_cond);
sc_cond_signal(&recorder->stream_cond);
sc_mutex_unlock(&recorder->mutex);
}
void
sc_recorder_join(struct sc_recorder *recorder) {
sc_thread_join(&recorder->thread, NULL);
}
void void
sc_recorder_destroy(struct sc_recorder *recorder) { sc_recorder_destroy(struct sc_recorder *recorder) {
sc_cond_destroy(&recorder->stream_cond);
sc_cond_destroy(&recorder->queue_cond); sc_cond_destroy(&recorder->queue_cond);
sc_mutex_destroy(&recorder->mutex); sc_mutex_destroy(&recorder->mutex);
free(recorder->filename); free(recorder->filename);

View file

@ -31,10 +31,14 @@ struct sc_recorder {
sc_thread thread; sc_thread thread;
sc_mutex mutex; sc_mutex mutex;
sc_cond queue_cond; sc_cond queue_cond;
bool stopped; // set on recorder_close() // set on sc_recorder_stop(), packet_sink close or recording failure
bool failed; // set on packet write failure bool stopped;
struct sc_recorder_queue queue; struct sc_recorder_queue queue;
// wake up the recorder thread once the codec in known
sc_cond stream_cond;
const AVCodec *codec;
const struct sc_recorder_callbacks *cbs; const struct sc_recorder_callbacks *cbs;
void *cbs_userdata; void *cbs_userdata;
}; };
@ -50,6 +54,12 @@ sc_recorder_init(struct sc_recorder *recorder, const char *filename,
struct sc_size declared_frame_size, struct sc_size declared_frame_size,
const struct sc_recorder_callbacks *cbs, void *cbs_userdata); const struct sc_recorder_callbacks *cbs, void *cbs_userdata);
void
sc_recorder_stop(struct sc_recorder *recorder);
void
sc_recorder_join(struct sc_recorder *recorder);
void void
sc_recorder_destroy(struct sc_recorder *recorder); sc_recorder_destroy(struct sc_recorder *recorder);

View file

@ -660,6 +660,9 @@ end:
if (file_pusher_initialized) { if (file_pusher_initialized) {
sc_file_pusher_stop(&s->file_pusher); sc_file_pusher_stop(&s->file_pusher);
} }
if (recorder_initialized) {
sc_recorder_stop(&s->recorder);
}
if (screen_initialized) { if (screen_initialized) {
sc_screen_interrupt(&s->screen); sc_screen_interrupt(&s->screen);
} }
@ -706,6 +709,7 @@ end:
} }
if (recorder_initialized) { if (recorder_initialized) {
sc_recorder_join(&s->recorder);
sc_recorder_destroy(&s->recorder); sc_recorder_destroy(&s->recorder);
} }

View file

@ -19,6 +19,7 @@ struct sc_packet_sink {
}; };
struct sc_packet_sink_ops { struct sc_packet_sink_ops {
/* The codec instance is static, it is valid until the end of the program */
bool (*open)(struct sc_packet_sink *sink, const AVCodec *codec); bool (*open)(struct sc_packet_sink *sink, const AVCodec *codec);
void (*close)(struct sc_packet_sink *sink); void (*close)(struct sc_packet_sink *sink);
bool (*push)(struct sc_packet_sink *sink, const AVPacket *packet); bool (*push)(struct sc_packet_sink *sink, const AVPacket *packet);