From 4bb9f7d848572d26821f64270d83a7598af80a6a Mon Sep 17 00:00:00 2001 From: Aron Heinecke Date: Sat, 22 May 2021 19:25:58 +0200 Subject: [PATCH] Document a bunch of stuff on the main Signed-off-by: Aron Heinecke --- src/discord.rs | 26 ++------------------------ src/main.rs | 38 ++++++++++++++++++++++++++++---------- 2 files changed, 30 insertions(+), 34 deletions(-) diff --git a/src/discord.rs b/src/discord.rs index b638162..c0ecabb 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -1,3 +1,5 @@ +//! Discord handler + use serde::Deserialize; use serenity::prelude::Mentionable; @@ -377,11 +379,6 @@ impl VoiceEventHandler for Receiver { Ctx::VoicePacket {audio, packet, payload_offset, payload_end_pad} => { // An event which fires for every received audio packet, // containing the decoded data. - // let data: &[u8] = &packet.payload.as_slice()[*payload_offset..(packet.payload.len()-payload_end_pad)]; - // let packet = OutAudio::new(&AudioData::C2S { id: 0, codec: CodecType::OpusVoice, data }); - // if let Err(e) = self.sink.send_timeout(packet, Duration::from_millis(10)).await { - // eprint!("Can't send voice to sender: {}",e); - // } if let Some(audio) = audio { { let time = std::time::Instant::now(); @@ -397,25 +394,6 @@ impl VoiceEventHandler for Receiver { let _ = lock.insert(packet.ssrc, audio.clone()); } } - // println!("Audio packet's first 5 samples: {:?}", audio.get(..5.min(audio.len()))); - // // println!( - // // "Audio packet sequence {:05} has {:04} bytes (decompressed from {}), SSRC {}", - // // packet.sequence.0, - // // audio.len() * std::mem::size_of::(), - // // packet.payload.len(), - // // packet.ssrc, - // // ); - // let mut values_converted = Vec::with_capacity(2*audio.len()); - // for value in audio { - // // maybe "be" ? - // // TODO: we could optimize this, data isn't directly used - // values_converted.extend(&value.to_le_bytes()); - // } - // let packet = - // OutAudio::new(&AudioData::C2S { id: 0, codec: CodecType::OpusMusic, data: &values_converted }); - // if let Err(e) = self.sink.send_timeout(packet, Duration::from_millis(10)).await { - // eprint!("Can't send voice to sender: {}",e); - // } } else { println!("RTP packet, but no audio. Driver may not be configured to decode."); } diff --git a/src/main.rs b/src/main.rs index a6f8485..bbf42e2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -89,7 +89,9 @@ impl TypeMapKey for ListenerHolder { type Value = (TsToDiscordPipeline,AudioBufferDiscord); } -const TICK_TIME: u64 = 15; +/// teamspeak audio fragment timer +/// We want to run every 20ms, but we only get ~1ms correctness +const TICK_TIME: u64 = 18; const FRAME_SIZE_MS: usize = 20; const STEREO_20MS: usize = 48000 * 2 * FRAME_SIZE_MS / 1000; /// The maximum size of an opus frame is 1275 as from RFC6716. @@ -97,7 +99,7 @@ const MAX_OPUS_FRAME_SIZE: usize = 1275; #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt::init(); - + // init logging stuff used by tsclientlib let config: Config = toml::from_str(&std::fs::read_to_string(".credentials.toml").unwrap()).unwrap(); let logger = { let decorator = slog_term::TermDecorator::new().build(); @@ -107,7 +109,7 @@ async fn main() -> Result<()> { Logger::root(drain, o!()) }; - + // init discord framework let framework = StandardFramework::new() .configure(|c| c .prefix("~")) @@ -122,7 +124,7 @@ async fn main() -> Result<()> { .decode_mode(DecodeMode::Decode) ); - + // init discord client let mut client = Client::builder(&config.discord_token) .event_handler(discord::Handler) .framework(framework) @@ -130,11 +132,14 @@ async fn main() -> Result<()> { .await .expect("Err creating client"); + // init teamspeak -> discord pipeline let ts_voice_logger = logger.new(o!("pipeline" => "voice-ts")); let teamspeak_voice_handler = TsToDiscordPipeline::new(ts_voice_logger); + // init discord -> teamspeak pipeline let map = HashMap::new(); let discord_voice_buffer: AudioBufferDiscord = Arc::new(Mutex::new(map)); + // stuff discord -> teamspeak pipeline into discord context for retrieval inside the client { // Open the data lock in write mode, so keys can be inserted to it. let mut data = client.data.write().await; @@ -145,24 +150,27 @@ async fn main() -> Result<()> { data.insert::((teamspeak_voice_handler.clone(),discord_voice_buffer.clone())); } + // spawn client runner tokio::spawn(async move { let _ = client.start().await.map_err(|why| println!("Client ended: {:?}", why)); }); let con_id = ConnectionId(0); + // configure teamspeak client let con_config = Connection::build(config.teamspeak_server) .log_commands(config.verbose >= 1) .log_packets(config.verbose >= 2) .log_udp_packets(config.verbose >= 3); - // Optionally set the key of this client, otherwise a new key is generated. + // teamspeak: Optionally set the key of this client, otherwise a new key is generated. let id = Identity::new_from_str(&config.teamspeak_identity).expect("Can't load identity!"); let con_config = con_config.identity(id); - // Connect + // Connect teamspeak client let mut con = con_config.connect()?; + // todo: something something discord connection events? let r = con .events() .try_filter(|e| future::ready(matches!(e, StreamItem::BookEvents(_)))) @@ -171,16 +179,23 @@ async fn main() -> Result<()> { if let Some(r) = r { r?; } + + // init discord -> teamspeak opus encoder let encoder = audiopus::coder::Encoder::new( audiopus::SampleRate::Hz48000, audiopus::Channels::Stereo, audiopus::Application::Voip) .expect("Can't construct encoder!"); + // we have to stuff this inside an arc-mutex to avoid lifetime shenanigans let encoder = Arc::new(Mutex::new(encoder)); + + // teamspeak playback timer let mut interval = tokio::time::interval(Duration::from_millis(TICK_TIME)); loop { + // handle teamspeak events let events = con.events().try_for_each(|e| async { + // handle teamspeak audio packets if let StreamItem::Audio(packet) = e { let from = ClientId(match packet.data().data() { AudioData::S2C { from, .. } => *from, @@ -189,17 +204,19 @@ async fn main() -> Result<()> { }); let mut ts_voice: std::sync::MutexGuard = teamspeak_voice_handler.data.lock().expect("Can't lock ts audio buffer!"); + // feed mixer+jitter buffer, consumed by discord if let Err(e) = ts_voice.handle_packet((con_id, from), packet) { debug!(logger, "Failed to play TS_Voice packet"; "error" => %e); } } Ok(()) }); - // Wait for ctrl + c + // Wait for ctrl + c and run everything else, end on who ever stops first tokio::select! { _send = interval.tick() => { let start = std::time::Instant::now(); - if let Some(processed) = process_audio(&discord_voice_buffer,&encoder).await { + // send audio frame to teamspeak + if let Some(processed) = process_discord_audio(&discord_voice_buffer,&encoder).await { con.send_audio(processed)?; let dur = start.elapsed(); if dur >= Duration::from_millis(1) { @@ -223,8 +240,9 @@ async fn main() -> Result<()> { } - -async fn process_audio(voice_buffer: &AudioBufferDiscord, encoder: &Arc>) -> Option { +/// Create an audio frame for consumption by teamspeak. +/// Merges all streams and converts them to opus +async fn process_discord_audio(voice_buffer: &AudioBufferDiscord, encoder: &Arc>) -> Option { let mut buffer_map; { let mut lock = voice_buffer.lock().await;