From a23b9f121bf6a3b7ba25b490c0e50e235af458a8 Mon Sep 17 00:00:00 2001 From: Aron Heinecke Date: Mon, 10 May 2021 14:01:35 +0200 Subject: [PATCH] Discord -> teamspeak pipe without audio hearable Teamspeak bot displays "talking" but nothing can be heard. Just piping opus 1:1 from discord to ts. Won't work for >1 person in a channel. Signed-off-by: Aron Heinecke --- src/discord.rs | 183 ++++++++++++++++++++++++++++++++++++++++++++++++- src/main.rs | 64 +++++++++++++---- 2 files changed, 231 insertions(+), 16 deletions(-) diff --git a/src/discord.rs b/src/discord.rs index a88be3c..23fcfb6 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -1,11 +1,13 @@ -use std::env; +use std::{env, sync::Arc, time::Duration}; use serde::Deserialize; +use tokio::sync::mpsc; use tsclientlib::{ClientId, Connection, DisconnectOptions, Identity, StreamItem}; -use tsproto_packets::packets::{AudioData, CodecType, OutAudio, OutPacket}; +use tsproto_packets::packets::{AudioData, CodecType, Direction, OutAudio, OutPacket}; use audiopus::coder::Encoder; use futures::prelude::*; use sdl2::audio::{AudioCallback, AudioDevice, AudioSpec, AudioSpecDesired, AudioStatus}; use sdl2::AudioSubsystem; +use serenity::prelude::Mentionable; // This trait adds the `register_songbird` and `register_songbird_with` methods // to the client builder below, making it easy to install this voice client. @@ -28,6 +30,17 @@ use serenity::{ model::{channel::Message, gateway::Ready}, Result as SerenityResult, }; +use songbird::{ + driver::{Config as DriverConfig, DecodeMode}, + model::payload::{ClientConnect, ClientDisconnect, Speaking}, + CoreEvent, + Event, + EventContext, + EventHandler as VoiceEventHandler, + Songbird, +}; + +use crate::ListenerHolder; pub(crate) struct Handler; @@ -105,7 +118,51 @@ async fn join(ctx: &Context, msg: &Message) -> CommandResult { let manager = songbird::get(ctx).await .expect("Songbird Voice client placed in at initialisation.").clone(); - let _handler = manager.join(guild_id, connect_to).await; + let (handler_lock, conn_result) = manager.join(guild_id, connect_to).await; + + if let Ok(_) = conn_result { + // NOTE: this skips listening for the actual connection result. + let channel: Arc>; + { + let data_read = ctx.data.read().await; + channel = data_read.get::().expect("Expected CommandCounter in TypeMap.").clone(); + } + let mut handler = handler_lock.lock().await; + + handler.add_global_event( + CoreEvent::SpeakingStateUpdate.into(), + Receiver::new(channel.clone()), + ); + + handler.add_global_event( + CoreEvent::SpeakingUpdate.into(), + Receiver::new(channel.clone()), + ); + + handler.add_global_event( + CoreEvent::VoicePacket.into(), + Receiver::new(channel.clone()), + ); + + handler.add_global_event( + CoreEvent::RtcpPacket.into(), + Receiver::new(channel.clone()), + ); + + handler.add_global_event( + CoreEvent::ClientConnect.into(), + Receiver::new(channel.clone()), + ); + + handler.add_global_event( + CoreEvent::ClientDisconnect.into(), + Receiver::new(channel), + ); + + check_msg(msg.channel_id.say(&ctx.http, &format!("Joined {}", connect_to.mention())).await); + } else { + check_msg(msg.channel_id.say(&ctx.http, "Error joining the channel").await); + } Ok(()) } @@ -272,4 +329,124 @@ fn check_msg(result: SerenityResult) { if let Err(why) = result { println!("Error sending message: {:?}", why); } +} + +struct Receiver{ + sink: Arc>, +} + +impl Receiver { + pub fn new(voice_receiver: Arc>) -> Self { + // You can manage state here, such as a buffer of audio packet bytes so + // you can later store them in intervals. + Self { + sink: voice_receiver, + } + } +} + +#[async_trait] +impl VoiceEventHandler for Receiver { + #[allow(unused_variables)] + async fn act(&self, ctx: &EventContext<'_>) -> Option { + use EventContext as Ctx; + match ctx { + Ctx::SpeakingStateUpdate( + Speaking {speaking, ssrc, user_id, ..} + ) => { + // Discord voice calls use RTP, where every sender uses a randomly allocated + // *Synchronisation Source* (SSRC) to allow receivers to tell which audio + // stream a received packet belongs to. As this number is not derived from + // the sender's user_id, only Discord Voice Gateway messages like this one + // inform us about which random SSRC a user has been allocated. Future voice + // packets will contain *only* the SSRC. + // + // You can implement logic here so that you can differentiate users' + // SSRCs and map the SSRC to the User ID and maintain this state. + // Using this map, you can map the `ssrc` in `voice_packet` + // to the user ID and handle their audio packets separately. + println!( + "Speaking state update: user {:?} has SSRC {:?}, using {:?}", + user_id, + ssrc, + speaking, + ); + }, + Ctx::SpeakingUpdate {ssrc, speaking} => { + // You can implement logic here which reacts to a user starting + // or stopping speaking. + println!( + "Source {} has {} speaking.", + ssrc, + if *speaking {"started"} else {"stopped"}, + ); + }, + Ctx::VoicePacket {audio, packet, payload_offset, payload_end_pad} => { + // An event which fires for every received audio packet, + // containing the decoded data. + let data: &[u8] = &packet.payload.as_slice()[*payload_offset..(packet.payload.len()-payload_end_pad)]; + let packet = OutAudio::new(&AudioData::C2S { id: 0, codec: CodecType::OpusMusic, data }); + if let Err(e) = self.sink.send_timeout(packet, Duration::from_millis(10)).await { + eprint!("Can't send voice to sender: {}",e); + } + if let Some(audio) = audio { + // println!("Audio packet's first 5 samples: {:?}", audio.get(..5.min(audio.len()))); + // // println!( + // // "Audio packet sequence {:05} has {:04} bytes (decompressed from {}), SSRC {}", + // // packet.sequence.0, + // // audio.len() * std::mem::size_of::(), + // // packet.payload.len(), + // // packet.ssrc, + // // ); + // let mut values_converted = Vec::with_capacity(2*audio.len()); + // for value in audio { + // // maybe "be" ? + // // TODO: we could optimize this, data isn't directly used + // values_converted.extend(&value.to_le_bytes()); + // } + // let packet = + // OutAudio::new(&AudioData::C2S { id: 0, codec: CodecType::OpusMusic, data: &values_converted }); + // if let Err(e) = self.sink.send_timeout(packet, Duration::from_millis(10)).await { + // eprint!("Can't send voice to sender: {}",e); + // } + } else { + println!("RTP packet, but no audio. Driver may not be configured to decode."); + } + }, + Ctx::RtcpPacket {packet, payload_offset, payload_end_pad} => { + // An event which fires for every received rtcp packet, + // containing the call statistics and reporting information. + println!("RTCP packet received: {:?}", packet); + }, + Ctx::ClientConnect( + ClientConnect {audio_ssrc, video_ssrc, user_id, ..} + ) => { + // You can implement your own logic here to handle a user who has joined the + // voice channel e.g., allocate structures, map their SSRC to User ID. + + println!( + "Client connected: user {:?} has audio SSRC {:?}, video SSRC {:?}", + user_id, + audio_ssrc, + video_ssrc, + ); + }, + Ctx::ClientDisconnect( + ClientDisconnect {user_id, ..} + ) => { + // You can implement your own logic here to handle a user who has left the + // voice channel e.g., finalise processing of statistics etc. + // You will typically need to map the User ID to their SSRC; observed when + // speaking or connecting. + + println!("Client disconnected: user {:?}", user_id); + }, + _ => { + // We won't be registering this struct for any more event classes. + unimplemented!() + } + } + + None + } } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index c9b4f97..c53e5f2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use std::env; +use std::{collections::HashMap, env, sync::Arc}; use serde::Deserialize; use tsclientlib::{ClientId, Connection, DisconnectOptions, Identity, StreamItem}; use tsproto_packets::packets::{AudioData, CodecType, OutAudio, OutPacket}; @@ -20,10 +20,11 @@ struct ConnectionId(u64); // This trait adds the `register_songbird` and `register_songbird_with` methods // to the client builder below, making it easy to install this voice client. // The voice client can be retrieved in any command using `songbird::get(ctx).await`. -use songbird::SerenityInit; +use songbird::{SerenityInit, Songbird}; +use songbird::driver::{Config as DriverConfig, DecodeMode}; // Import the `Context` to handle commands. -use serenity::client::Context; +use serenity::{client::Context, prelude::{RwLock, TypeMapKey}}; use serenity::{ async_trait, @@ -52,6 +53,12 @@ struct Config { volume: f32, } +struct ListenerHolder; + +impl TypeMapKey for ListenerHolder { + type Value = Arc>; +} + #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt::init(); @@ -71,13 +78,35 @@ async fn main() -> Result<()> { .prefix("~")) .group(&discord::GENERAL_GROUP); + // Here, we need to configure Songbird to decode all incoming voice packets. + // If you want, you can do this on a per-call basis---here, we need it to + // read the audio data that other people are sending us! + let songbird = Songbird::serenity(); + songbird.set_config( + DriverConfig::default() + .decode_mode(DecodeMode::Decode) + ); + + let mut client = Client::builder(&config.discord_token) .event_handler(discord::Handler) .framework(framework) - .register_songbird() + .register_songbird_with(songbird.into()) .await .expect("Err creating client"); + let (tx,mut rx) = mpsc::channel(50); + let voice_pipes: Arc> = Arc::new(tx); + { + // Open the data lock in write mode, so keys can be inserted to it. + let mut data = client.data.write().await; + + // The CommandCounter Value has the following type: + // Arc>> + // So, we have to insert the same type to it. + data.insert::(voice_pipes); + } + tokio::spawn(async move { let _ = client.start().await.map_err(|why| println!("Client ended: {:?}", why)); }); @@ -107,13 +136,13 @@ async fn main() -> Result<()> { r?; } - let (send, mut recv) = mpsc::channel(5); - { - let mut a2t = audiodata.a2ts.lock().unwrap(); - a2t.set_listener(send); - a2t.set_volume(config.volume); - a2t.set_playing(true); - } + // let (send, mut recv) = mpsc::channel(5); + // { + // let mut a2t = audiodata.a2ts.lock().unwrap(); + // a2t.set_listener(send); + // a2t.set_volume(config.volume); + // a2t.set_playing(true); + // } loop { let t2a = audiodata.ts2a.clone(); @@ -134,7 +163,15 @@ async fn main() -> Result<()> { // Wait for ctrl + c tokio::select! { - send_audio = recv.recv() => { + // send_audio = recv.recv() => { + // if let Some(packet) = send_audio { + // con.send_audio(packet)?; + // } else { + // info!(logger, "Audio sending stream was canceled"); + // break; + // } + // } + send_audio = rx.recv() => { if let Some(packet) = send_audio { con.send_audio(packet)?; } else { @@ -149,9 +186,10 @@ async fn main() -> Result<()> { } }; } - + println!("Disconnecting"); // Disconnect con.disconnect(DisconnectOptions::new())?; con.events().for_each(|_| future::ready(())).await; + println!("Disconnected"); Ok(()) } \ No newline at end of file