From d9cf674602cfddb92d1ec3da70c647346ae68015 Mon Sep 17 00:00:00 2001 From: Aron Heinecke Date: Mon, 10 May 2021 18:28:57 +0200 Subject: [PATCH] Working discord -> ts audio, bad quality Signed-off-by: Aron Heinecke --- src/discord.rs | 27 +++++++---- src/main.rs | 95 +++++++++++++++++++++++++++++++------ src/ts_voice/audio_to_ts.rs | 1 - 3 files changed, 99 insertions(+), 24 deletions(-) diff --git a/src/discord.rs b/src/discord.rs index 23fcfb6..f891fde 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -122,7 +122,7 @@ async fn join(ctx: &Context, msg: &Message) -> CommandResult { if let Ok(_) = conn_result { // NOTE: this skips listening for the actual connection result. - let channel: Arc>; + let channel: crate::AudioBufferDiscord; { let data_read = ctx.data.read().await; channel = data_read.get::().expect("Expected CommandCounter in TypeMap.").clone(); @@ -332,11 +332,11 @@ fn check_msg(result: SerenityResult) { } struct Receiver{ - sink: Arc>, + sink: crate::AudioBufferDiscord, } impl Receiver { - pub fn new(voice_receiver: Arc>) -> Self { + pub fn new(voice_receiver: crate::AudioBufferDiscord) -> Self { // You can manage state here, such as a buffer of audio packet bytes so // you can later store them in intervals. Self { @@ -384,12 +384,21 @@ impl VoiceEventHandler for Receiver { Ctx::VoicePacket {audio, packet, payload_offset, payload_end_pad} => { // An event which fires for every received audio packet, // containing the decoded data. - let data: &[u8] = &packet.payload.as_slice()[*payload_offset..(packet.payload.len()-payload_end_pad)]; - let packet = OutAudio::new(&AudioData::C2S { id: 0, codec: CodecType::OpusMusic, data }); - if let Err(e) = self.sink.send_timeout(packet, Duration::from_millis(10)).await { - eprint!("Can't send voice to sender: {}",e); - } + // let data: &[u8] = &packet.payload.as_slice()[*payload_offset..(packet.payload.len()-payload_end_pad)]; + // let packet = OutAudio::new(&AudioData::C2S { id: 0, codec: CodecType::OpusVoice, data }); + // if let Err(e) = self.sink.send_timeout(packet, Duration::from_millis(10)).await { + // eprint!("Can't send voice to sender: {}",e); + // } if let Some(audio) = audio { + { + let mut lock = self.sink.lock().await; + if let Some(buffer) = lock.get_mut(&packet.ssrc) { + buffer.extend(audio); + } else { + // TODO: can we skip this clone ? + let _ = lock.insert(packet.ssrc, audio.clone()); + } + } // println!("Audio packet's first 5 samples: {:?}", audio.get(..5.min(audio.len()))); // // println!( // // "Audio packet sequence {:05} has {:04} bytes (decompressed from {}), SSRC {}", @@ -416,7 +425,7 @@ impl VoiceEventHandler for Receiver { Ctx::RtcpPacket {packet, payload_offset, payload_end_pad} => { // An event which fires for every received rtcp packet, // containing the call statistics and reporting information. - println!("RTCP packet received: {:?}", packet); + //println!("RTCP packet received: {:?}", packet); }, Ctx::ClientConnect( ClientConnect {audio_ssrc, video_ssrc, user_id, ..} diff --git a/src/main.rs b/src/main.rs index c53e5f2..df122c2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,18 +1,21 @@ -use std::{collections::HashMap, env, sync::Arc}; +use std::{collections::HashMap, env, sync::Arc, time::Duration}; use serde::Deserialize; use tsclientlib::{ClientId, Connection, DisconnectOptions, Identity, StreamItem}; use tsproto_packets::packets::{AudioData, CodecType, OutAudio, OutPacket}; use audiopus::coder::Encoder; -use futures::prelude::*; +use futures::{lock::Mutex, prelude::*}; use sdl2::audio::{AudioCallback, AudioDevice, AudioSpec, AudioSpecDesired, AudioStatus}; use sdl2::AudioSubsystem; use slog::{debug, info, o, Drain, Logger}; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, task}; use tokio::task::LocalSet; use anyhow::*; +use tsproto_packets::packets::{Direction, InAudioBuf}; +use songbird::opus; + mod ts_voice; mod discord; -use tsproto_packets::packets::{Direction, InAudioBuf}; + #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] struct ConnectionId(u64); @@ -55,8 +58,11 @@ struct Config { struct ListenerHolder; +//TODO: stop shooting myself in the knee with a mutex +type AudioBufferDiscord = Arc>>>; + impl TypeMapKey for ListenerHolder { - type Value = Arc>; + type Value = AudioBufferDiscord; } #[tokio::main] @@ -95,8 +101,8 @@ async fn main() -> Result<()> { .await .expect("Err creating client"); - let (tx,mut rx) = mpsc::channel(50); - let voice_pipes: Arc> = Arc::new(tx); + let map = HashMap::new(); + let voice_buffer: AudioBufferDiscord = Arc::new(Mutex::new(map)); { // Open the data lock in write mode, so keys can be inserted to it. let mut data = client.data.write().await; @@ -104,7 +110,7 @@ async fn main() -> Result<()> { // The CommandCounter Value has the following type: // Arc>> // So, we have to insert the same type to it. - data.insert::(voice_pipes); + data.insert::(voice_buffer.clone()); } tokio::spawn(async move { @@ -144,6 +150,17 @@ async fn main() -> Result<()> { // a2t.set_playing(true); // } + let mut interval = tokio::time::interval(Duration::from_millis(20)); + + // tokio::spawn(async { + // loop { + // interval.tick().await; + // if let Err(e) = con.send_audio() { + // println!("Failed to send audio to teamspeak: {}",e); + // } + // } + // }); + loop { let t2a = audiodata.ts2a.clone(); let events = con.events().try_for_each(|e| async { @@ -171,12 +188,18 @@ async fn main() -> Result<()> { // break; // } // } - send_audio = rx.recv() => { - if let Some(packet) = send_audio { - con.send_audio(packet)?; - } else { - info!(logger, "Audio sending stream was canceled"); - break; + // send_audio = rx.recv() => { + // tokio::time:: + // if let Some(packet) = send_audio { + // con.send_audio(packet)?; + // } else { + // info!(logger, "Audio sending stream was canceled"); + // break; + // } + // } + _send = interval.tick() => { + if let Some(processed) = process_audio(&voice_buffer).await { + con.send_audio(processed)?; } } _ = tokio::signal::ctrl_c() => { break; } @@ -192,4 +215,48 @@ async fn main() -> Result<()> { con.events().for_each(|_| future::ready(())).await; println!("Disconnected"); Ok(()) +} + +async fn process_audio(voice_buffer: &AudioBufferDiscord) -> Option { + let buffer_map; + { + let mut lock = voice_buffer.lock().await; + buffer_map = std::mem::replace(&mut *lock, HashMap::new()); + } + if buffer_map.is_empty() { + return None; + } + let mut encoded = [0; 256]; + let res = task::spawn_blocking(move || { + let start = std::time::Instant::now(); + let mut data: Vec = Vec::new(); + for buffer in buffer_map.values() { + for i in 0..buffer.len() { + if let Some(v) = data.get_mut(i) { + *v = *v + buffer[i]; + } else { + data.push(buffer[i]); + } + } + } + //println!("Data size: {}",data.len()); + let encoder = audiopus::coder::Encoder::new( + audiopus::SampleRate::Hz48000, + audiopus::Channels::Stereo, + audiopus::Application::Voip) + .expect("Can't construct encoder!"); + + let length = match encoder.encode(&data, &mut encoded) { + Err(e) => {eprintln!("Failed to encode voice: {}",e); return None;}, + Ok(size) => size, + }; + //println!("length size: {}",length); + let duration = start.elapsed().as_millis(); + if duration > 15 { + eprintln!("Took too {}ms for processing audio!",duration); + } + + Some(OutAudio::new(&AudioData::C2S { id: 0, codec: CodecType::OpusMusic, data: &encoded[..length] })) + }).await.expect("Join error for audio processing thread!"); + res } \ No newline at end of file diff --git a/src/ts_voice/audio_to_ts.rs b/src/ts_voice/audio_to_ts.rs index 37e20e4..66fe42d 100644 --- a/src/ts_voice/audio_to_ts.rs +++ b/src/ts_voice/audio_to_ts.rs @@ -91,7 +91,6 @@ impl AudioToTs { encoder, listener, volume, - opus_output: [0; MAX_OPUS_FRAME_SIZE], } }).map_err(|e| format_err!("SDL error: {}", e))