diff --git a/Cargo.lock b/Cargo.lock index 1ef3797..2895a5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -227,6 +227,12 @@ version = "3.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe" +[[package]] +name = "byte-slice-cast" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65c1bf4a04a88c54f589125563643d773f3254b5c38571395e2b591c693bbc81" + [[package]] name = "byteorder" version = "1.4.3" @@ -3175,6 +3181,7 @@ version = "0.1.0" dependencies = [ "anyhow", "audiopus", + "byte-slice-cast", "futures", "sdl2", "serde", diff --git a/Cargo.toml b/Cargo.toml index c3c3ba0..ed5e447 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,9 @@ edition = "2018" toml = "0.5" serde = { version = "1.0", features = ["derive"] } +# pushing [f32] to [u8] and vice verca +byte-slice-cast = "1" + # tokio tracing from songbird tracing = "0.1" tracing-subscriber = "0.2" @@ -53,4 +56,4 @@ features = ["client", "standard_framework", "voice","native_tls_backend"] [dependencies.tokio] version = "1.0" -features = ["macros", "rt-multi-thread","signal"] \ No newline at end of file +features = ["macros", "rt-multi-thread","signal", "sync"] \ No newline at end of file diff --git a/src/discord.rs b/src/discord.rs index 38b67e2..e2e62bf 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -12,7 +12,7 @@ use serenity::prelude::Mentionable; // This trait adds the `register_songbird` and `register_songbird_with` methods // to the client builder below, making it easy to install this voice client. // The voice client can be retrieved in any command using `songbird::get(ctx).await`. -use songbird::SerenityInit; +use songbird::{SerenityInit, input::Input}; // Import the `Context` to handle commands. use serenity::client::Context; @@ -123,12 +123,16 @@ async fn join(ctx: &Context, msg: &Message) -> CommandResult { if let Ok(_) = conn_result { // NOTE: this skips listening for the actual connection result. let channel: crate::AudioBufferDiscord; + let ts_buffer: crate::TsToDiscordPipeline; { let data_read = ctx.data.read().await; - channel = data_read.get::().expect("Expected CommandCounter in TypeMap.").clone(); + let (ts_buf,chan) = data_read.get::().expect("Expected CommandCounter in TypeMap.").clone(); + channel = chan; + ts_buffer = ts_buf; } let mut handler = handler_lock.lock().await; - + let discord_input = Input::float_pcm(true, songbird::input::Reader::Extension(Box::new(ts_buffer.clone()))); + handler.play_only_source(discord_input); handler.add_global_event( CoreEvent::SpeakingStateUpdate.into(), Receiver::new(channel.clone()), @@ -395,7 +399,7 @@ impl VoiceEventHandler for Receiver { let mut lock = self.sink.lock().await; let dur = time.elapsed(); if dur.as_millis() > 1 { - eprintln!("Akquiring lock took {}ms",dur.as_millis()); + eprintln!("Acquiring lock took {}ms",dur.as_millis()); } if let Some(buffer) = lock.get_mut(&packet.ssrc) { buffer.extend(audio); diff --git a/src/main.rs b/src/main.rs index 75cda73..02b3147 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,24 +1,27 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::HashMap, io::{Read, copy}, mem::size_of, sync::Arc, time::Duration}; +use byte_slice_cast::{AsByteSlice, AsMutByteSlice}; use serde::Deserialize; -use tsclientlib::{ClientId, Connection, DisconnectOptions, Identity, StreamItem}; +use tsclientlib::{ClientId, Connection, DisconnectOptions, Identity, StreamItem, audio::AudioHandler}; use tsproto_packets::packets::{AudioData, CodecType, OutAudio, OutPacket}; use audiopus::coder::Encoder; -use futures::{lock::Mutex, prelude::*}; +use futures::prelude::*; use slog::{debug, o, Drain, Logger}; -use tokio::{task}; +use tokio::task; use tokio::task::LocalSet; +use tokio::sync::Mutex; use anyhow::*; + + mod ts_voice; mod discord; - #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] struct ConnectionId(u64); // This trait adds the `register_songbird` and `register_songbird_with` methods // to the client builder below, making it easy to install this voice client. // The voice client can be retrieved in any command using `songbird::get(ctx).await`. -use songbird::{SerenityInit, Songbird}; +use songbird::{SerenityInit, Songbird, input::Input}; use songbird::driver::{Config as DriverConfig, DecodeMode}; // Import the `Context` to handle commands. @@ -56,8 +59,42 @@ struct ListenerHolder; //TODO: stop shooting myself in the knee with a mutex type AudioBufferDiscord = Arc>>>; + +type TsVoiceId = (ConnectionId, ClientId); +type TsAudioHandler = tsclientlib::audio::AudioHandler; + +#[derive(Clone)] +struct TsToDiscordPipeline { + data: Arc>, +} + +impl TsToDiscordPipeline { + pub fn new(logger: Logger) -> Self { + Self { + data: Arc::new(std::sync::Mutex::new(TsAudioHandler::new(logger))) + } + } +} + +impl Read for TsToDiscordPipeline { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + // TODO: can't we support async read for songbird ? this is kinda bad.. + let mut lock = self.data.lock().expect("Can't lock ts voice buffer!"); + + // and this is really ugly.. read only works for u8, but we get an f32 and need to convert that without chaning AudioHandlers API + // also Read for stuff that specifies to use f32 is kinda meh + let len = buf.len() / size_of::(); + let mut wtr: Vec = vec![0.0; len]; + lock.fill_buffer(wtr.as_mut_slice()); + let slice = wtr.as_byte_slice(); + buf.copy_from_slice(slice); + + Ok(buf.len()) + } +} + impl TypeMapKey for ListenerHolder { - type Value = AudioBufferDiscord; + type Value = (TsToDiscordPipeline,AudioBufferDiscord); } const TICK_TIME: u64 = 15; @@ -101,8 +138,11 @@ async fn main() -> Result<()> { .await .expect("Err creating client"); + let ts_voice_logger = logger.new(o!("pipeline" => "voice-ts")); + let teamspeak_voice_handler = TsToDiscordPipeline::new(logger); + let map = HashMap::new(); - let voice_buffer: AudioBufferDiscord = Arc::new(Mutex::new(map)); + let discord_voice_buffer: AudioBufferDiscord = Arc::new(Mutex::new(map)); { // Open the data lock in write mode, so keys can be inserted to it. let mut data = client.data.write().await; @@ -110,7 +150,7 @@ async fn main() -> Result<()> { // The CommandCounter Value has the following type: // Arc>> // So, we have to insert the same type to it. - data.insert::(voice_buffer.clone()); + data.insert::((teamspeak_voice_handler.clone(),discord_voice_buffer.clone())); } tokio::spawn(async move { @@ -118,8 +158,6 @@ async fn main() -> Result<()> { }); let con_id = ConnectionId(0); - let local_set = LocalSet::new(); - let audiodata = ts_voice::start(logger.clone(), &local_set)?; let con_config = Connection::build(config.teamspeak_server) .log_commands(config.verbose >= 1) @@ -141,14 +179,6 @@ async fn main() -> Result<()> { if let Some(r) = r { r?; } - - // let (send, mut recv) = mpsc::channel(5); - // { - // let mut a2t = audiodata.a2ts.lock().unwrap(); - // a2t.set_listener(send); - // a2t.set_volume(config.volume); - // a2t.set_playing(true); - // } let encoder = audiopus::coder::Encoder::new( audiopus::SampleRate::Hz48000, audiopus::Channels::Stereo, @@ -157,17 +187,7 @@ async fn main() -> Result<()> { let encoder = Arc::new(Mutex::new(encoder)); let mut interval = tokio::time::interval(Duration::from_millis(TICK_TIME)); - // tokio::spawn(async { - // loop { - // interval.tick().await; - // if let Err(e) = con.send_audio() { - // println!("Failed to send audio to teamspeak: {}",e); - // } - // } - // }); - loop { - let t2a = audiodata.ts2a.clone(); let events = con.events().try_for_each(|e| async { if let StreamItem::Audio(packet) = e { let from = ClientId(match packet.data().data() { @@ -176,10 +196,11 @@ async fn main() -> Result<()> { _ => panic!("Can only handle S2C packets but got a C2S packet"), }); - // let mut t2a = t2a.lock().unwrap(); - // if let Err(e) = t2a.play_packet((con_id, from), packet) { - // debug!(logger, "Failed to play packet"; "error" => %e); - // } + let mut ts_voice: std::sync::MutexGuard = teamspeak_voice_handler.data.lock().expect("Can't lock ts audio buffer!"); + if let Err(e) = ts_voice.handle_packet((con_id, from), packet) { + //debug!(logger, "Failed to play packet"; "error" => %e); + eprintln!("Failed to play TS_Voice packet {}",e); + } } Ok(()) }); @@ -209,7 +230,7 @@ async fn main() -> Result<()> { // eprintln!("Tick took {}ms",dur.as_millis()); // } let start = std::time::Instant::now(); - if let Some(processed) = process_audio(&voice_buffer,&encoder).await { + if let Some(processed) = process_audio(&discord_voice_buffer,&encoder).await { con.send_audio(processed)?; let dur = start.elapsed(); if dur >= Duration::from_millis(1) {