Add teamspeak->discord pipeline

Signed-off-by: Aron Heinecke <aron.heinecke@t-online.de>
This commit is contained in:
Aron Heinecke 2021-05-22 03:01:37 +02:00
parent 818549f38c
commit 4cbcd275e8
4 changed files with 74 additions and 39 deletions

7
Cargo.lock generated
View file

@ -227,6 +227,12 @@ version = "3.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe" checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe"
[[package]]
name = "byte-slice-cast"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65c1bf4a04a88c54f589125563643d773f3254b5c38571395e2b591c693bbc81"
[[package]] [[package]]
name = "byteorder" name = "byteorder"
version = "1.4.3" version = "1.4.3"
@ -3175,6 +3181,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"audiopus", "audiopus",
"byte-slice-cast",
"futures", "futures",
"sdl2", "sdl2",
"serde", "serde",

View file

@ -10,6 +10,9 @@ edition = "2018"
toml = "0.5" toml = "0.5"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
# pushing [f32] to [u8] and vice verca
byte-slice-cast = "1"
# tokio tracing from songbird # tokio tracing from songbird
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.2" tracing-subscriber = "0.2"
@ -53,4 +56,4 @@ features = ["client", "standard_framework", "voice","native_tls_backend"]
[dependencies.tokio] [dependencies.tokio]
version = "1.0" version = "1.0"
features = ["macros", "rt-multi-thread","signal"] features = ["macros", "rt-multi-thread","signal", "sync"]

View file

@ -12,7 +12,7 @@ use serenity::prelude::Mentionable;
// This trait adds the `register_songbird` and `register_songbird_with` methods // This trait adds the `register_songbird` and `register_songbird_with` methods
// to the client builder below, making it easy to install this voice client. // to the client builder below, making it easy to install this voice client.
// The voice client can be retrieved in any command using `songbird::get(ctx).await`. // The voice client can be retrieved in any command using `songbird::get(ctx).await`.
use songbird::SerenityInit; use songbird::{SerenityInit, input::Input};
// Import the `Context` to handle commands. // Import the `Context` to handle commands.
use serenity::client::Context; use serenity::client::Context;
@ -123,12 +123,16 @@ async fn join(ctx: &Context, msg: &Message) -> CommandResult {
if let Ok(_) = conn_result { if let Ok(_) = conn_result {
// NOTE: this skips listening for the actual connection result. // NOTE: this skips listening for the actual connection result.
let channel: crate::AudioBufferDiscord; let channel: crate::AudioBufferDiscord;
let ts_buffer: crate::TsToDiscordPipeline;
{ {
let data_read = ctx.data.read().await; let data_read = ctx.data.read().await;
channel = data_read.get::<ListenerHolder>().expect("Expected CommandCounter in TypeMap.").clone(); let (ts_buf,chan) = data_read.get::<ListenerHolder>().expect("Expected CommandCounter in TypeMap.").clone();
channel = chan;
ts_buffer = ts_buf;
} }
let mut handler = handler_lock.lock().await; let mut handler = handler_lock.lock().await;
let discord_input = Input::float_pcm(true, songbird::input::Reader::Extension(Box::new(ts_buffer.clone())));
handler.play_only_source(discord_input);
handler.add_global_event( handler.add_global_event(
CoreEvent::SpeakingStateUpdate.into(), CoreEvent::SpeakingStateUpdate.into(),
Receiver::new(channel.clone()), Receiver::new(channel.clone()),
@ -395,7 +399,7 @@ impl VoiceEventHandler for Receiver {
let mut lock = self.sink.lock().await; let mut lock = self.sink.lock().await;
let dur = time.elapsed(); let dur = time.elapsed();
if dur.as_millis() > 1 { if dur.as_millis() > 1 {
eprintln!("Akquiring lock took {}ms",dur.as_millis()); eprintln!("Acquiring lock took {}ms",dur.as_millis());
} }
if let Some(buffer) = lock.get_mut(&packet.ssrc) { if let Some(buffer) = lock.get_mut(&packet.ssrc) {
buffer.extend(audio); buffer.extend(audio);

View file

@ -1,24 +1,27 @@
use std::{collections::HashMap, sync::Arc, time::Duration}; use std::{collections::HashMap, io::{Read, copy}, mem::size_of, sync::Arc, time::Duration};
use byte_slice_cast::{AsByteSlice, AsMutByteSlice};
use serde::Deserialize; use serde::Deserialize;
use tsclientlib::{ClientId, Connection, DisconnectOptions, Identity, StreamItem}; use tsclientlib::{ClientId, Connection, DisconnectOptions, Identity, StreamItem, audio::AudioHandler};
use tsproto_packets::packets::{AudioData, CodecType, OutAudio, OutPacket}; use tsproto_packets::packets::{AudioData, CodecType, OutAudio, OutPacket};
use audiopus::coder::Encoder; use audiopus::coder::Encoder;
use futures::{lock::Mutex, prelude::*}; use futures::prelude::*;
use slog::{debug, o, Drain, Logger}; use slog::{debug, o, Drain, Logger};
use tokio::{task}; use tokio::task;
use tokio::task::LocalSet; use tokio::task::LocalSet;
use tokio::sync::Mutex;
use anyhow::*; use anyhow::*;
mod ts_voice; mod ts_voice;
mod discord; mod discord;
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
struct ConnectionId(u64); struct ConnectionId(u64);
// This trait adds the `register_songbird` and `register_songbird_with` methods // This trait adds the `register_songbird` and `register_songbird_with` methods
// to the client builder below, making it easy to install this voice client. // to the client builder below, making it easy to install this voice client.
// The voice client can be retrieved in any command using `songbird::get(ctx).await`. // The voice client can be retrieved in any command using `songbird::get(ctx).await`.
use songbird::{SerenityInit, Songbird}; use songbird::{SerenityInit, Songbird, input::Input};
use songbird::driver::{Config as DriverConfig, DecodeMode}; use songbird::driver::{Config as DriverConfig, DecodeMode};
// Import the `Context` to handle commands. // Import the `Context` to handle commands.
@ -56,8 +59,42 @@ struct ListenerHolder;
//TODO: stop shooting myself in the knee with a mutex //TODO: stop shooting myself in the knee with a mutex
type AudioBufferDiscord = Arc<Mutex<HashMap<u32,Vec<i16>>>>; type AudioBufferDiscord = Arc<Mutex<HashMap<u32,Vec<i16>>>>;
type TsVoiceId = (ConnectionId, ClientId);
type TsAudioHandler = tsclientlib::audio::AudioHandler<TsVoiceId>;
#[derive(Clone)]
struct TsToDiscordPipeline {
data: Arc<std::sync::Mutex<TsAudioHandler>>,
}
impl TsToDiscordPipeline {
pub fn new(logger: Logger) -> Self {
Self {
data: Arc::new(std::sync::Mutex::new(TsAudioHandler::new(logger)))
}
}
}
impl Read for TsToDiscordPipeline {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// TODO: can't we support async read for songbird ? this is kinda bad..
let mut lock = self.data.lock().expect("Can't lock ts voice buffer!");
// and this is really ugly.. read only works for u8, but we get an f32 and need to convert that without chaning AudioHandlers API
// also Read for stuff that specifies to use f32 is kinda meh
let len = buf.len() / size_of::<f32>();
let mut wtr: Vec<f32> = vec![0.0; len];
lock.fill_buffer(wtr.as_mut_slice());
let slice = wtr.as_byte_slice();
buf.copy_from_slice(slice);
Ok(buf.len())
}
}
impl TypeMapKey for ListenerHolder { impl TypeMapKey for ListenerHolder {
type Value = AudioBufferDiscord; type Value = (TsToDiscordPipeline,AudioBufferDiscord);
} }
const TICK_TIME: u64 = 15; const TICK_TIME: u64 = 15;
@ -101,8 +138,11 @@ async fn main() -> Result<()> {
.await .await
.expect("Err creating client"); .expect("Err creating client");
let ts_voice_logger = logger.new(o!("pipeline" => "voice-ts"));
let teamspeak_voice_handler = TsToDiscordPipeline::new(logger);
let map = HashMap::new(); let map = HashMap::new();
let voice_buffer: AudioBufferDiscord = Arc::new(Mutex::new(map)); let discord_voice_buffer: AudioBufferDiscord = Arc::new(Mutex::new(map));
{ {
// Open the data lock in write mode, so keys can be inserted to it. // Open the data lock in write mode, so keys can be inserted to it.
let mut data = client.data.write().await; let mut data = client.data.write().await;
@ -110,7 +150,7 @@ async fn main() -> Result<()> {
// The CommandCounter Value has the following type: // The CommandCounter Value has the following type:
// Arc<RwLock<HashMap<String, u64>>> // Arc<RwLock<HashMap<String, u64>>>
// So, we have to insert the same type to it. // So, we have to insert the same type to it.
data.insert::<ListenerHolder>(voice_buffer.clone()); data.insert::<ListenerHolder>((teamspeak_voice_handler.clone(),discord_voice_buffer.clone()));
} }
tokio::spawn(async move { tokio::spawn(async move {
@ -118,8 +158,6 @@ async fn main() -> Result<()> {
}); });
let con_id = ConnectionId(0); let con_id = ConnectionId(0);
let local_set = LocalSet::new();
let audiodata = ts_voice::start(logger.clone(), &local_set)?;
let con_config = Connection::build(config.teamspeak_server) let con_config = Connection::build(config.teamspeak_server)
.log_commands(config.verbose >= 1) .log_commands(config.verbose >= 1)
@ -141,14 +179,6 @@ async fn main() -> Result<()> {
if let Some(r) = r { if let Some(r) = r {
r?; r?;
} }
// let (send, mut recv) = mpsc::channel(5);
// {
// let mut a2t = audiodata.a2ts.lock().unwrap();
// a2t.set_listener(send);
// a2t.set_volume(config.volume);
// a2t.set_playing(true);
// }
let encoder = audiopus::coder::Encoder::new( let encoder = audiopus::coder::Encoder::new(
audiopus::SampleRate::Hz48000, audiopus::SampleRate::Hz48000,
audiopus::Channels::Stereo, audiopus::Channels::Stereo,
@ -157,17 +187,7 @@ async fn main() -> Result<()> {
let encoder = Arc::new(Mutex::new(encoder)); let encoder = Arc::new(Mutex::new(encoder));
let mut interval = tokio::time::interval(Duration::from_millis(TICK_TIME)); let mut interval = tokio::time::interval(Duration::from_millis(TICK_TIME));
// tokio::spawn(async {
// loop {
// interval.tick().await;
// if let Err(e) = con.send_audio() {
// println!("Failed to send audio to teamspeak: {}",e);
// }
// }
// });
loop { loop {
let t2a = audiodata.ts2a.clone();
let events = con.events().try_for_each(|e| async { let events = con.events().try_for_each(|e| async {
if let StreamItem::Audio(packet) = e { if let StreamItem::Audio(packet) = e {
let from = ClientId(match packet.data().data() { let from = ClientId(match packet.data().data() {
@ -176,10 +196,11 @@ async fn main() -> Result<()> {
_ => panic!("Can only handle S2C packets but got a C2S packet"), _ => panic!("Can only handle S2C packets but got a C2S packet"),
}); });
// let mut t2a = t2a.lock().unwrap(); let mut ts_voice: std::sync::MutexGuard<TsAudioHandler> = teamspeak_voice_handler.data.lock().expect("Can't lock ts audio buffer!");
// if let Err(e) = t2a.play_packet((con_id, from), packet) { if let Err(e) = ts_voice.handle_packet((con_id, from), packet) {
//debug!(logger, "Failed to play packet"; "error" => %e); //debug!(logger, "Failed to play packet"; "error" => %e);
// } eprintln!("Failed to play TS_Voice packet {}",e);
}
} }
Ok(()) Ok(())
}); });
@ -209,7 +230,7 @@ async fn main() -> Result<()> {
// eprintln!("Tick took {}ms",dur.as_millis()); // eprintln!("Tick took {}ms",dur.as_millis());
// } // }
let start = std::time::Instant::now(); let start = std::time::Instant::now();
if let Some(processed) = process_audio(&voice_buffer,&encoder).await { if let Some(processed) = process_audio(&discord_voice_buffer,&encoder).await {
con.send_audio(processed)?; con.send_audio(processed)?;
let dur = start.elapsed(); let dur = start.elapsed();
if dur >= Duration::from_millis(1) { if dur >= Duration::from_millis(1) {