Working discord -> ts audio, bad quality

Signed-off-by: Aron Heinecke <aron.heinecke@t-online.de>
This commit is contained in:
Aron Heinecke 2021-05-10 18:28:57 +02:00
parent a23b9f121b
commit d9cf674602
3 changed files with 99 additions and 24 deletions

View file

@ -122,7 +122,7 @@ async fn join(ctx: &Context, msg: &Message) -> CommandResult {
if let Ok(_) = conn_result { if let Ok(_) = conn_result {
// NOTE: this skips listening for the actual connection result. // NOTE: this skips listening for the actual connection result.
let channel: Arc<mpsc::Sender<OutPacket>>; let channel: crate::AudioBufferDiscord;
{ {
let data_read = ctx.data.read().await; let data_read = ctx.data.read().await;
channel = data_read.get::<ListenerHolder>().expect("Expected CommandCounter in TypeMap.").clone(); channel = data_read.get::<ListenerHolder>().expect("Expected CommandCounter in TypeMap.").clone();
@ -332,11 +332,11 @@ fn check_msg(result: SerenityResult<Message>) {
} }
struct Receiver{ struct Receiver{
sink: Arc<mpsc::Sender<OutPacket>>, sink: crate::AudioBufferDiscord,
} }
impl Receiver { impl Receiver {
pub fn new(voice_receiver: Arc<mpsc::Sender<OutPacket>>) -> Self { pub fn new(voice_receiver: crate::AudioBufferDiscord) -> Self {
// You can manage state here, such as a buffer of audio packet bytes so // You can manage state here, such as a buffer of audio packet bytes so
// you can later store them in intervals. // you can later store them in intervals.
Self { Self {
@ -384,12 +384,21 @@ impl VoiceEventHandler for Receiver {
Ctx::VoicePacket {audio, packet, payload_offset, payload_end_pad} => { Ctx::VoicePacket {audio, packet, payload_offset, payload_end_pad} => {
// An event which fires for every received audio packet, // An event which fires for every received audio packet,
// containing the decoded data. // containing the decoded data.
let data: &[u8] = &packet.payload.as_slice()[*payload_offset..(packet.payload.len()-payload_end_pad)]; // let data: &[u8] = &packet.payload.as_slice()[*payload_offset..(packet.payload.len()-payload_end_pad)];
let packet = OutAudio::new(&AudioData::C2S { id: 0, codec: CodecType::OpusMusic, data }); // let packet = OutAudio::new(&AudioData::C2S { id: 0, codec: CodecType::OpusVoice, data });
if let Err(e) = self.sink.send_timeout(packet, Duration::from_millis(10)).await { // if let Err(e) = self.sink.send_timeout(packet, Duration::from_millis(10)).await {
eprint!("Can't send voice to sender: {}",e); // eprint!("Can't send voice to sender: {}",e);
} // }
if let Some(audio) = audio { if let Some(audio) = audio {
{
let mut lock = self.sink.lock().await;
if let Some(buffer) = lock.get_mut(&packet.ssrc) {
buffer.extend(audio);
} else {
// TODO: can we skip this clone ?
let _ = lock.insert(packet.ssrc, audio.clone());
}
}
// println!("Audio packet's first 5 samples: {:?}", audio.get(..5.min(audio.len()))); // println!("Audio packet's first 5 samples: {:?}", audio.get(..5.min(audio.len())));
// // println!( // // println!(
// // "Audio packet sequence {:05} has {:04} bytes (decompressed from {}), SSRC {}", // // "Audio packet sequence {:05} has {:04} bytes (decompressed from {}), SSRC {}",
@ -416,7 +425,7 @@ impl VoiceEventHandler for Receiver {
Ctx::RtcpPacket {packet, payload_offset, payload_end_pad} => { Ctx::RtcpPacket {packet, payload_offset, payload_end_pad} => {
// An event which fires for every received rtcp packet, // An event which fires for every received rtcp packet,
// containing the call statistics and reporting information. // containing the call statistics and reporting information.
println!("RTCP packet received: {:?}", packet); //println!("RTCP packet received: {:?}", packet);
}, },
Ctx::ClientConnect( Ctx::ClientConnect(
ClientConnect {audio_ssrc, video_ssrc, user_id, ..} ClientConnect {audio_ssrc, video_ssrc, user_id, ..}

View file

@ -1,18 +1,21 @@
use std::{collections::HashMap, env, sync::Arc}; use std::{collections::HashMap, env, sync::Arc, time::Duration};
use serde::Deserialize; use serde::Deserialize;
use tsclientlib::{ClientId, Connection, DisconnectOptions, Identity, StreamItem}; use tsclientlib::{ClientId, Connection, DisconnectOptions, Identity, StreamItem};
use tsproto_packets::packets::{AudioData, CodecType, OutAudio, OutPacket}; use tsproto_packets::packets::{AudioData, CodecType, OutAudio, OutPacket};
use audiopus::coder::Encoder; use audiopus::coder::Encoder;
use futures::prelude::*; use futures::{lock::Mutex, prelude::*};
use sdl2::audio::{AudioCallback, AudioDevice, AudioSpec, AudioSpecDesired, AudioStatus}; use sdl2::audio::{AudioCallback, AudioDevice, AudioSpec, AudioSpecDesired, AudioStatus};
use sdl2::AudioSubsystem; use sdl2::AudioSubsystem;
use slog::{debug, info, o, Drain, Logger}; use slog::{debug, info, o, Drain, Logger};
use tokio::sync::mpsc; use tokio::{sync::mpsc, task};
use tokio::task::LocalSet; use tokio::task::LocalSet;
use anyhow::*; use anyhow::*;
use tsproto_packets::packets::{Direction, InAudioBuf};
use songbird::opus;
mod ts_voice; mod ts_voice;
mod discord; mod discord;
use tsproto_packets::packets::{Direction, InAudioBuf};
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
struct ConnectionId(u64); struct ConnectionId(u64);
@ -55,8 +58,11 @@ struct Config {
struct ListenerHolder; struct ListenerHolder;
//TODO: stop shooting myself in the knee with a mutex
type AudioBufferDiscord = Arc<Mutex<HashMap<u32,Vec<i16>>>>;
impl TypeMapKey for ListenerHolder { impl TypeMapKey for ListenerHolder {
type Value = Arc<mpsc::Sender<OutPacket>>; type Value = AudioBufferDiscord;
} }
#[tokio::main] #[tokio::main]
@ -95,8 +101,8 @@ async fn main() -> Result<()> {
.await .await
.expect("Err creating client"); .expect("Err creating client");
let (tx,mut rx) = mpsc::channel(50); let map = HashMap::new();
let voice_pipes: Arc<mpsc::Sender<OutPacket>> = Arc::new(tx); let voice_buffer: AudioBufferDiscord = Arc::new(Mutex::new(map));
{ {
// Open the data lock in write mode, so keys can be inserted to it. // Open the data lock in write mode, so keys can be inserted to it.
let mut data = client.data.write().await; let mut data = client.data.write().await;
@ -104,7 +110,7 @@ async fn main() -> Result<()> {
// The CommandCounter Value has the following type: // The CommandCounter Value has the following type:
// Arc<RwLock<HashMap<String, u64>>> // Arc<RwLock<HashMap<String, u64>>>
// So, we have to insert the same type to it. // So, we have to insert the same type to it.
data.insert::<ListenerHolder>(voice_pipes); data.insert::<ListenerHolder>(voice_buffer.clone());
} }
tokio::spawn(async move { tokio::spawn(async move {
@ -144,6 +150,17 @@ async fn main() -> Result<()> {
// a2t.set_playing(true); // a2t.set_playing(true);
// } // }
let mut interval = tokio::time::interval(Duration::from_millis(20));
// tokio::spawn(async {
// loop {
// interval.tick().await;
// if let Err(e) = con.send_audio() {
// println!("Failed to send audio to teamspeak: {}",e);
// }
// }
// });
loop { loop {
let t2a = audiodata.ts2a.clone(); let t2a = audiodata.ts2a.clone();
let events = con.events().try_for_each(|e| async { let events = con.events().try_for_each(|e| async {
@ -171,12 +188,18 @@ async fn main() -> Result<()> {
// break; // break;
// } // }
// } // }
send_audio = rx.recv() => { // send_audio = rx.recv() => {
if let Some(packet) = send_audio { // tokio::time::
con.send_audio(packet)?; // if let Some(packet) = send_audio {
} else { // con.send_audio(packet)?;
info!(logger, "Audio sending stream was canceled"); // } else {
break; // info!(logger, "Audio sending stream was canceled");
// break;
// }
// }
_send = interval.tick() => {
if let Some(processed) = process_audio(&voice_buffer).await {
con.send_audio(processed)?;
} }
} }
_ = tokio::signal::ctrl_c() => { break; } _ = tokio::signal::ctrl_c() => { break; }
@ -192,4 +215,48 @@ async fn main() -> Result<()> {
con.events().for_each(|_| future::ready(())).await; con.events().for_each(|_| future::ready(())).await;
println!("Disconnected"); println!("Disconnected");
Ok(()) Ok(())
}
async fn process_audio(voice_buffer: &AudioBufferDiscord) -> Option<OutPacket> {
let buffer_map;
{
let mut lock = voice_buffer.lock().await;
buffer_map = std::mem::replace(&mut *lock, HashMap::new());
}
if buffer_map.is_empty() {
return None;
}
let mut encoded = [0; 256];
let res = task::spawn_blocking(move || {
let start = std::time::Instant::now();
let mut data: Vec<i16> = Vec::new();
for buffer in buffer_map.values() {
for i in 0..buffer.len() {
if let Some(v) = data.get_mut(i) {
*v = *v + buffer[i];
} else {
data.push(buffer[i]);
}
}
}
//println!("Data size: {}",data.len());
let encoder = audiopus::coder::Encoder::new(
audiopus::SampleRate::Hz48000,
audiopus::Channels::Stereo,
audiopus::Application::Voip)
.expect("Can't construct encoder!");
let length = match encoder.encode(&data, &mut encoded) {
Err(e) => {eprintln!("Failed to encode voice: {}",e); return None;},
Ok(size) => size,
};
//println!("length size: {}",length);
let duration = start.elapsed().as_millis();
if duration > 15 {
eprintln!("Took too {}ms for processing audio!",duration);
}
Some(OutAudio::new(&AudioData::C2S { id: 0, codec: CodecType::OpusMusic, data: &encoded[..length] }))
}).await.expect("Join error for audio processing thread!");
res
} }

View file

@ -91,7 +91,6 @@ impl AudioToTs {
encoder, encoder,
listener, listener,
volume, volume,
opus_output: [0; MAX_OPUS_FRAME_SIZE], opus_output: [0; MAX_OPUS_FRAME_SIZE],
} }
}).map_err(|e| format_err!("SDL error: {}", e)) }).map_err(|e| format_err!("SDL error: {}", e))