Fix decoder issues with RTP extensions

Signed-off-by: Aron Heinecke <aron.heinecke@t-online.de>
This commit is contained in:
Aron Heinecke 2021-08-05 16:49:54 +02:00
parent 74377b4816
commit 7c77c2cd52
3 changed files with 45 additions and 17 deletions

View file

@ -1,7 +1,11 @@
//! Discord handler //! Discord handler
use std::sync::Arc;
use audiopus::{Channels, SampleRate};
use audiopus::coder::Decoder;
use serde::Deserialize; use serde::Deserialize;
use serenity::prelude::Mentionable; use serenity::prelude::{Mentionable, Mutex};
use slog::error; use slog::error;
// This trait adds the `register_songbird` and `register_songbird_with` methods // This trait adds the `register_songbird` and `register_songbird_with` methods
@ -24,6 +28,8 @@ use serenity::{
model::{channel::Message, gateway::Ready}, model::{channel::Message, gateway::Ready},
Result as SerenityResult, Result as SerenityResult,
}; };
use songbird::packet::PacketSize;
use songbird::packet::rtp::RtpExtensionPacket;
use songbird::{ use songbird::{
model::payload::{ClientConnect, ClientDisconnect, Speaking}, model::payload::{ClientConnect, ClientDisconnect, Speaking},
CoreEvent, CoreEvent,
@ -330,6 +336,7 @@ fn check_msg(result: SerenityResult<Message>) {
struct Receiver{ struct Receiver{
sink: crate::AudioBufferDiscord, sink: crate::AudioBufferDiscord,
decoder: Arc<Mutex<Decoder>>,
} }
impl Receiver { impl Receiver {
@ -338,6 +345,7 @@ impl Receiver {
// you can later store them in intervals. // you can later store them in intervals.
Self { Self {
sink: voice_receiver, sink: voice_receiver,
decoder: Arc::new(Mutex::new(Decoder::new(SampleRate::Hz48000, Channels::Stereo).unwrap()))
} }
} }
} }
@ -383,19 +391,38 @@ impl VoiceEventHandler for Receiver {
// get raw opus package, we don't decode here and leave that to the AudioHandler // get raw opus package, we don't decode here and leave that to the AudioHandler
let last_bytes = packet.payload.len() - payload_end_pad; let last_bytes = packet.payload.len() - payload_end_pad;
let opus_slice = &packet.payload[*payload_offset..last_bytes]; let data = &packet.payload[*payload_offset..last_bytes];
let start = if packet.extension != 0 {
match RtpExtensionPacket::new(data) {
Some(v) => v.packet_size(),
None => {
eprintln!("Extension packet indicated, but insufficient space.");
return None;
}
}
} else {
0
};
let opus_slice = &data[start..];
let dur; let dur;
{ {
let time = std::time::Instant::now(); let mut lock_decoder = self.decoder.lock().await;
let mut lock = self.sink.lock().await; let mut decoded: [i16; 48000 *2 ] = [0; 48000 * 2];
dur = time.elapsed(); if let Err(e) = lock_decoder.decode(Some(opus_slice), &mut decoded[..], false) {
if let Err(e) = lock.handle_packet(packet.ssrc, packet.sequence.0.0, opus_slice.to_vec()) { eprintln!("Failed to handle Discord voice packet: {:?}",e);
eprintln!("Failed to handle Discord voice packet: {}",e); } else {
let time = std::time::Instant::now();
let mut lock = self.sink.lock().await;
dur = time.elapsed();
if let Err(e) = lock.handle_packet(packet.ssrc, packet.sequence.0.0, opus_slice.to_vec()) {
eprintln!("Failed to handle Discord voice packet: {}",e);
}
if dur.as_millis() > 1 {
eprintln!("Acquiring lock took {}ms",dur.as_millis());
}
} }
} }
if dur.as_millis() > 1 {
eprintln!("Acquiring lock took {}ms",dur.as_millis());
}
}, },
Ctx::RtcpPacket {packet, payload_offset, payload_end_pad} => { Ctx::RtcpPacket {packet, payload_offset, payload_end_pad} => {
// An event which fires for every received rtcp packet, // An event which fires for every received rtcp packet,

View file

@ -17,7 +17,7 @@ use std::hash::Hash;
use audiopus::coder::Decoder; use audiopus::coder::Decoder;
use audiopus::{packet, Channels, SampleRate}; use audiopus::{packet, Channels, SampleRate};
use slog::{debug, o, trace, warn, Logger}; use slog::{Logger, debug, info, o, trace, warn};
use tsclientlib::audio::Error; use tsclientlib::audio::Error;
use tsproto_packets::packets::{AudioData, CodecType, InAudioBuf}; use tsproto_packets::packets::{AudioData, CodecType, InAudioBuf};
@ -244,7 +244,7 @@ impl AudioQueue {
} }
fn decode_packet(&mut self, packet: Option<&QueuePacket>, fec: bool) -> Result<()> { fn decode_packet(&mut self, packet: Option<&QueuePacket>, fec: bool) -> Result<()> {
trace!(self.logger, "Decoding packet"; "has_packet" => packet.is_some(), "fec" => fec); debug!(self.logger, "Decoding packet"; "has_packet" => packet.is_some(), "fec" => fec);
let packet_data; let packet_data;
let len; let len;
if let Some(p) = packet { if let Some(p) = packet {
@ -347,7 +347,7 @@ impl AudioQueue {
cur_id cur_id
); );
// Packet loss // Packet loss
debug!(self.logger, "Audio packet loss"; "need" => cur_id, "have" => packet.id); info!(self.logger, "Audio packet loss"; "need" => cur_id, "have" => packet.id);
if packet.id == self.next_id { if packet.id == self.next_id {
// Can use forward-error-correction // Can use forward-error-correction
self.decode_packet(Some(&packet), true)?; self.decode_packet(Some(&packet), true)?;

View file

@ -92,7 +92,7 @@ impl TypeMapKey for ListenerHolder {
/// teamspeak audio fragment timer /// teamspeak audio fragment timer
/// We want to run every 20ms, but we only get ~1ms correctness /// We want to run every 20ms, but we only get ~1ms correctness
const TICK_TIME: u64 = 18; const TICK_TIME: u64 = 20;
const FRAME_SIZE_MS: usize = 20; const FRAME_SIZE_MS: usize = 20;
const SAMPLE_RATE: usize = 48000; const SAMPLE_RATE: usize = 48000;
const STEREO_20MS: usize = SAMPLE_RATE * 2 * FRAME_SIZE_MS / 1000; const STEREO_20MS: usize = SAMPLE_RATE * 2 * FRAME_SIZE_MS / 1000;
@ -104,6 +104,7 @@ const I16_CONVERSION_DIVIDER: f32 = 0x8000 as f32;
const MAX_OPUS_FRAME_SIZE: usize = 1275; const MAX_OPUS_FRAME_SIZE: usize = 1275;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
dbg!(STEREO_20MS);
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
// init logging stuff used by tsclientlib // init logging stuff used by tsclientlib
let config: Config = toml::from_str(&std::fs::read_to_string(".credentials.toml").unwrap()).unwrap(); let config: Config = toml::from_str(&std::fs::read_to_string(".credentials.toml").unwrap()).unwrap();
@ -127,7 +128,7 @@ async fn main() -> Result<()> {
let songbird = Songbird::serenity(); let songbird = Songbird::serenity();
songbird.set_config( songbird.set_config(
DriverConfig::default() DriverConfig::default()
.decode_mode(DecodeMode::Decode) .decode_mode(DecodeMode::Decrypt)
); );
// init discord client // init discord client
@ -255,12 +256,12 @@ async fn process_discord_audio(voice_buffer: &AudioBufferDiscord, encoder: &Arc<
// buffer_map = std::mem::replace(&mut *lock, HashMap::new()); // buffer_map = std::mem::replace(&mut *lock, HashMap::new());
// } // }
let mut data = [0.0; STEREO_20MS]; let mut data = [0.0; 1920];
{ {
let mut lock = voice_buffer.lock().await; let mut lock = voice_buffer.lock().await;
lock.fill_buffer(&mut data); lock.fill_buffer(&mut data);
} }
let mut encoded = [0; 1024]; let mut encoded = [0; 1920];
let encoder_c = encoder.clone(); let encoder_c = encoder.clone();
// don't block the async runtime // don't block the async runtime
let res = task::spawn_blocking(move || { let res = task::spawn_blocking(move || {