voice-bridge/src/discord.rs
Aron Heinecke b4bbf5e777 Fix audio encoder
Signed-off-by: Aron Heinecke <aron.heinecke@t-online.de>
2021-05-11 00:17:26 +02:00

466 lines
No EOL
16 KiB
Rust

use std::{env, sync::Arc, time::Duration};
use serde::Deserialize;
use tokio::sync::mpsc;
use tsclientlib::{ClientId, Connection, DisconnectOptions, Identity, StreamItem};
use tsproto_packets::packets::{AudioData, CodecType, Direction, OutAudio, OutPacket};
use audiopus::coder::Encoder;
use futures::prelude::*;
use sdl2::audio::{AudioCallback, AudioDevice, AudioSpec, AudioSpecDesired, AudioStatus};
use sdl2::AudioSubsystem;
use serenity::prelude::Mentionable;
// This trait adds the `register_songbird` and `register_songbird_with` methods
// to the client builder below, making it easy to install this voice client.
// The voice client can be retrieved in any command using `songbird::get(ctx).await`.
use songbird::SerenityInit;
// Import the `Context` to handle commands.
use serenity::client::Context;
use serenity::{
async_trait,
client::{Client, EventHandler},
framework::{
StandardFramework,
standard::{
Args, CommandResult,
macros::{command, group},
},
},
model::{channel::Message, gateway::Ready},
Result as SerenityResult,
};
use songbird::{
driver::{Config as DriverConfig, DecodeMode},
model::payload::{ClientConnect, ClientDisconnect, Speaking},
CoreEvent,
Event,
EventContext,
EventHandler as VoiceEventHandler,
Songbird,
};
use crate::ListenerHolder;
pub(crate) struct Handler;
#[async_trait]
impl EventHandler for Handler {
async fn ready(&self, _: Context, ready: Ready) {
println!("{} is connected!", ready.user.name);
}
}
#[group]
#[commands(deafen, join, leave, mute, play, ping, undeafen, unmute)]
pub struct General;
#[derive(Debug,Deserialize)]
struct Config {
discord_token: String,
teamspeak_server: String,
teamspeak_identity: String,
teamspeak_channel: i32,
}
#[command]
#[only_in(guilds)]
async fn deafen(ctx: &Context, msg: &Message) -> CommandResult {
let guild = msg.guild(&ctx.cache).await.unwrap();
let guild_id = guild.id;
let manager = songbird::get(ctx).await
.expect("Songbird Voice client placed in at initialisation.").clone();
let handler_lock = match manager.get(guild_id) {
Some(handler) => handler,
None => {
check_msg(msg.reply(ctx, "Not in a voice channel").await);
return Ok(());
},
};
let mut handler = handler_lock.lock().await;
if handler.is_deaf() {
check_msg(msg.channel_id.say(&ctx.http, "Already deafened").await);
} else {
if let Err(e) = handler.deafen(true).await {
check_msg(msg.channel_id.say(&ctx.http, format!("Failed: {:?}", e)).await);
}
check_msg(msg.channel_id.say(&ctx.http, "Deafened").await);
}
Ok(())
}
#[command]
#[only_in(guilds)]
async fn join(ctx: &Context, msg: &Message) -> CommandResult {
let guild = msg.guild(&ctx.cache).await.unwrap();
let guild_id = guild.id;
let channel_id = guild
.voice_states.get(&msg.author.id)
.and_then(|voice_state| voice_state.channel_id);
let connect_to = match channel_id {
Some(channel) => channel,
None => {
check_msg(msg.reply(ctx, "Not in a voice channel").await);
return Ok(());
}
};
let manager = songbird::get(ctx).await
.expect("Songbird Voice client placed in at initialisation.").clone();
let (handler_lock, conn_result) = manager.join(guild_id, connect_to).await;
if let Ok(_) = conn_result {
// NOTE: this skips listening for the actual connection result.
let channel: crate::AudioBufferDiscord;
{
let data_read = ctx.data.read().await;
channel = data_read.get::<ListenerHolder>().expect("Expected CommandCounter in TypeMap.").clone();
}
let mut handler = handler_lock.lock().await;
handler.add_global_event(
CoreEvent::SpeakingStateUpdate.into(),
Receiver::new(channel.clone()),
);
handler.add_global_event(
CoreEvent::SpeakingUpdate.into(),
Receiver::new(channel.clone()),
);
handler.add_global_event(
CoreEvent::VoicePacket.into(),
Receiver::new(channel.clone()),
);
handler.add_global_event(
CoreEvent::RtcpPacket.into(),
Receiver::new(channel.clone()),
);
handler.add_global_event(
CoreEvent::ClientConnect.into(),
Receiver::new(channel.clone()),
);
handler.add_global_event(
CoreEvent::ClientDisconnect.into(),
Receiver::new(channel),
);
check_msg(msg.channel_id.say(&ctx.http, &format!("Joined {}", connect_to.mention())).await);
} else {
check_msg(msg.channel_id.say(&ctx.http, "Error joining the channel").await);
}
Ok(())
}
#[command]
#[only_in(guilds)]
async fn leave(ctx: &Context, msg: &Message) -> CommandResult {
let guild = msg.guild(&ctx.cache).await.unwrap();
let guild_id = guild.id;
let manager = songbird::get(ctx).await
.expect("Songbird Voice client placed in at initialisation.").clone();
let has_handler = manager.get(guild_id).is_some();
if has_handler {
if let Err(e) = manager.remove(guild_id).await {
check_msg(msg.channel_id.say(&ctx.http, format!("Failed: {:?}", e)).await);
}
check_msg(msg.channel_id.say(&ctx.http, "Left voice channel").await);
} else {
check_msg(msg.reply(ctx, "Not in a voice channel").await);
}
Ok(())
}
#[command]
#[only_in(guilds)]
async fn mute(ctx: &Context, msg: &Message) -> CommandResult {
let guild = msg.guild(&ctx.cache).await.unwrap();
let guild_id = guild.id;
let manager = songbird::get(ctx).await
.expect("Songbird Voice client placed in at initialisation.").clone();
let handler_lock = match manager.get(guild_id) {
Some(handler) => handler,
None => {
check_msg(msg.reply(ctx, "Not in a voice channel").await);
return Ok(());
},
};
let mut handler = handler_lock.lock().await;
if handler.is_mute() {
check_msg(msg.channel_id.say(&ctx.http, "Already muted").await);
} else {
if let Err(e) = handler.mute(true).await {
check_msg(msg.channel_id.say(&ctx.http, format!("Failed: {:?}", e)).await);
}
check_msg(msg.channel_id.say(&ctx.http, "Now muted").await);
}
Ok(())
}
#[command]
async fn ping(context: &Context, msg: &Message) -> CommandResult {
check_msg(msg.channel_id.say(&context.http, "Pong!").await);
Ok(())
}
#[command]
#[only_in(guilds)]
async fn play(ctx: &Context, msg: &Message, mut args: Args) -> CommandResult {
let url = match args.single::<String>() {
Ok(url) => url,
Err(_) => {
check_msg(msg.channel_id.say(&ctx.http, "Must provide a URL to a video or audio").await);
return Ok(());
},
};
if !url.starts_with("http") {
check_msg(msg.channel_id.say(&ctx.http, "Must provide a valid URL").await);
return Ok(());
}
let guild = msg.guild(&ctx.cache).await.unwrap();
let guild_id = guild.id;
let manager = songbird::get(ctx).await
.expect("Songbird Voice client placed in at initialisation.").clone();
if let Some(handler_lock) = manager.get(guild_id) {
let mut handler = handler_lock.lock().await;
let source = match songbird::ytdl(&url).await {
Ok(source) => source,
Err(why) => {
println!("Err starting source: {:?}", why);
check_msg(msg.channel_id.say(&ctx.http, "Error sourcing ffmpeg").await);
return Ok(());
},
};
handler.play_source(source);
check_msg(msg.channel_id.say(&ctx.http, "Playing song").await);
} else {
check_msg(msg.channel_id.say(&ctx.http, "Not in a voice channel to play in").await);
}
Ok(())
}
#[command]
#[only_in(guilds)]
async fn undeafen(ctx: &Context, msg: &Message) -> CommandResult {
let guild = msg.guild(&ctx.cache).await.unwrap();
let guild_id = guild.id;
let manager = songbird::get(ctx).await
.expect("Songbird Voice client placed in at initialisation.").clone();
if let Some(handler_lock) = manager.get(guild_id) {
let mut handler = handler_lock.lock().await;
if let Err(e) = handler.deafen(false).await {
check_msg(msg.channel_id.say(&ctx.http, format!("Failed: {:?}", e)).await);
}
check_msg(msg.channel_id.say(&ctx.http, "Undeafened").await);
} else {
check_msg(msg.channel_id.say(&ctx.http, "Not in a voice channel to undeafen in").await);
}
Ok(())
}
#[command]
#[only_in(guilds)]
async fn unmute(ctx: &Context, msg: &Message) -> CommandResult {
let guild = msg.guild(&ctx.cache).await.unwrap();
let guild_id = guild.id;
let manager = songbird::get(ctx).await
.expect("Songbird Voice client placed in at initialisation.").clone();
if let Some(handler_lock) = manager.get(guild_id) {
let mut handler = handler_lock.lock().await;
if let Err(e) = handler.mute(false).await {
check_msg(msg.channel_id.say(&ctx.http, format!("Failed: {:?}", e)).await);
}
check_msg(msg.channel_id.say(&ctx.http, "Unmuted").await);
} else {
check_msg(msg.channel_id.say(&ctx.http, "Not in a voice channel to unmute in").await);
}
Ok(())
}
/// Checks that a message successfully sent; if not, then logs why to stdout.
fn check_msg(result: SerenityResult<Message>) {
if let Err(why) = result {
println!("Error sending message: {:?}", why);
}
}
struct Receiver{
sink: crate::AudioBufferDiscord,
}
impl Receiver {
pub fn new(voice_receiver: crate::AudioBufferDiscord) -> Self {
// You can manage state here, such as a buffer of audio packet bytes so
// you can later store them in intervals.
Self {
sink: voice_receiver,
}
}
}
#[async_trait]
impl VoiceEventHandler for Receiver {
#[allow(unused_variables)]
async fn act(&self, ctx: &EventContext<'_>) -> Option<Event> {
use EventContext as Ctx;
match ctx {
Ctx::SpeakingStateUpdate(
Speaking {speaking, ssrc, user_id, ..}
) => {
// Discord voice calls use RTP, where every sender uses a randomly allocated
// *Synchronisation Source* (SSRC) to allow receivers to tell which audio
// stream a received packet belongs to. As this number is not derived from
// the sender's user_id, only Discord Voice Gateway messages like this one
// inform us about which random SSRC a user has been allocated. Future voice
// packets will contain *only* the SSRC.
//
// You can implement logic here so that you can differentiate users'
// SSRCs and map the SSRC to the User ID and maintain this state.
// Using this map, you can map the `ssrc` in `voice_packet`
// to the user ID and handle their audio packets separately.
//println!(
// "Speaking state update: user {:?} has SSRC {:?}, using {:?}",
// user_id,
// ssrc,
// speaking,
// );
},
Ctx::SpeakingUpdate {ssrc, speaking} => {
// You can implement logic here which reacts to a user starting
// or stopping speaking.
//println!(
// "Source {} has {} speaking.",
// ssrc,
// if *speaking {"started"} else {"stopped"},
// );
},
Ctx::VoicePacket {audio, packet, payload_offset, payload_end_pad} => {
// An event which fires for every received audio packet,
// containing the decoded data.
// let data: &[u8] = &packet.payload.as_slice()[*payload_offset..(packet.payload.len()-payload_end_pad)];
// let packet = OutAudio::new(&AudioData::C2S { id: 0, codec: CodecType::OpusVoice, data });
// if let Err(e) = self.sink.send_timeout(packet, Duration::from_millis(10)).await {
// eprint!("Can't send voice to sender: {}",e);
// }
if let Some(audio) = audio {
{
let time = std::time::Instant::now();
let mut lock = self.sink.lock().await;
let dur = time.elapsed();
if dur.as_millis() > 1 {
eprintln!("Akquiring lock took {}ms",dur.as_millis());
}
if let Some(buffer) = lock.get_mut(&packet.ssrc) {
buffer.extend(audio);
} else {
// TODO: can we skip this clone ?
let _ = lock.insert(packet.ssrc, audio.clone());
}
}
// println!("Audio packet's first 5 samples: {:?}", audio.get(..5.min(audio.len())));
// // println!(
// // "Audio packet sequence {:05} has {:04} bytes (decompressed from {}), SSRC {}",
// // packet.sequence.0,
// // audio.len() * std::mem::size_of::<i16>(),
// // packet.payload.len(),
// // packet.ssrc,
// // );
// let mut values_converted = Vec::with_capacity(2*audio.len());
// for value in audio {
// // maybe "be" ?
// // TODO: we could optimize this, data isn't directly used
// values_converted.extend(&value.to_le_bytes());
// }
// let packet =
// OutAudio::new(&AudioData::C2S { id: 0, codec: CodecType::OpusMusic, data: &values_converted });
// if let Err(e) = self.sink.send_timeout(packet, Duration::from_millis(10)).await {
// eprint!("Can't send voice to sender: {}",e);
// }
} else {
println!("RTP packet, but no audio. Driver may not be configured to decode.");
}
},
Ctx::RtcpPacket {packet, payload_offset, payload_end_pad} => {
// An event which fires for every received rtcp packet,
// containing the call statistics and reporting information.
//println!("RTCP packet received: {:?}", packet);
},
Ctx::ClientConnect(
ClientConnect {audio_ssrc, video_ssrc, user_id, ..}
) => {
// You can implement your own logic here to handle a user who has joined the
// voice channel e.g., allocate structures, map their SSRC to User ID.
println!(
"Client connected: user {:?} has audio SSRC {:?}, video SSRC {:?}",
user_id,
audio_ssrc,
video_ssrc,
);
},
Ctx::ClientDisconnect(
ClientDisconnect {user_id, ..}
) => {
// You can implement your own logic here to handle a user who has left the
// voice channel e.g., finalise processing of statistics etc.
// You will typically need to map the User ID to their SSRC; observed when
// speaking or connecting.
println!("Client disconnected: user {:?}", user_id);
},
_ => {
// We won't be registering this struct for any more event classes.
unimplemented!()
}
}
None
}
}