diff --git a/src/main/java/de/strifel/VTools/listeners/TGBridge.java b/src/main/java/de/strifel/VTools/listeners/TGBridge.java index fe992aa..666d39f 100644 --- a/src/main/java/de/strifel/VTools/listeners/TGBridge.java +++ b/src/main/java/de/strifel/VTools/listeners/TGBridge.java @@ -4,9 +4,12 @@ import com.google.common.collect.ImmutableList; import com.pengrad.telegrambot.Callback; import com.pengrad.telegrambot.TelegramBot; import com.pengrad.telegrambot.UpdatesListener; +import com.pengrad.telegrambot.model.Message; import com.pengrad.telegrambot.model.Update; import com.pengrad.telegrambot.model.User; +import com.pengrad.telegrambot.request.EditMessageText; import com.pengrad.telegrambot.request.SendMessage; +import com.pengrad.telegrambot.response.BaseResponse; import com.pengrad.telegrambot.response.SendResponse; import com.velocitypowered.api.event.Subscribe; import com.velocitypowered.api.event.connection.DisconnectEvent; @@ -17,6 +20,8 @@ import com.velocitypowered.api.proxy.ProxyServer; import com.velocitypowered.api.proxy.server.RegisteredServer; import de.strifel.VTools.VTools; import net.kyori.adventure.text.Component; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.yaml.snakeyaml.Yaml; import java.io.IOException; @@ -27,6 +32,8 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.BiConsumer; import java.util.stream.Collectors; public class TGBridge { @@ -36,11 +43,14 @@ public class TGBridge { private TelegramBot bot; private String TOKEN = ""; + private long BOT_ID = -1; private long CHAT_ID = 0L; + private int ONLINE_STATUS_MESSAGE_ID = -1; + private long backoffSec = 1L; - public TGBridge(VTools plugin) { + public TGBridge(@NotNull VTools plugin) { INSTANCE = this; this.plugin = plugin; this.server = plugin.getServer(); @@ -49,6 +59,7 @@ public class TGBridge { public void register() { server.getEventManager().register(plugin, this); botInit(); + initUpdateThread(); } private void loadConfig() { @@ -67,6 +78,7 @@ public class TGBridge { synchronized (this) { this.CHAT_ID = Long.parseLong(config.getOrDefault("chat_id", "0")); this.TOKEN = config.getOrDefault("token", ""); + this.BOT_ID = Long.parseLong(TOKEN.split(":")[0]); } } catch (Exception e) { plugin.logger.error("parsing config", e); @@ -82,51 +94,54 @@ public class TGBridge { for (Update update : updates) { try { if (update != null && - update.message() != null && - update.message().chat() != null && - update.message().chat().id() == CHAT_ID && - update.message().from() != null + update.message() != null && + update.message().chat() != null && + update.message().chat().id() == CHAT_ID && + update.message().from() != null ) { if (update.message().text() != null && !update.message().text().isEmpty()) { String msg = update.message().text(); - if (msg.equals("/list")) { - ArrayList out = new ArrayList<>(); - String fmt = server.getAllPlayers().size() > 1 ? "%d players are currently connected to the proxy." : "%d player is currently connected to the proxy."; - out.add(String.format(fmt, server.getAllPlayers().size())); - List servers = new ArrayList<>(server.getAllServers()); - for (RegisteredServer server : servers) { - List onServer = ImmutableList.copyOf(server.getPlayersConnected()); - if (!onServer.isEmpty()) { - out.add(String.format("[%s] (%d): %s", - server.getServerInfo().getName(), - onServer.size(), - onServer.stream().map(Player::getUsername).collect(Collectors.joining(", "))) - ); - + switch (msg) { + case "/list": + outbound(genOnlineStatus()); + break; + case "/setpin": + Message replyTo = update.message().replyToMessage(); + if (replyTo == null || replyTo.from() == null || replyTo.from().id() != BOT_ID) { + outbound("must reply a message that from the bot"); + } else { + int messageId = replyTo.messageId(); + ONLINE_STATUS_MESSAGE_ID = messageId > 0 ? messageId : ONLINE_STATUS_MESSAGE_ID; + outbound("done"); } - } - outbound(String.join("\n", out)); + break; + case "/genpin": + outbound(genOnlineStatus(), + (sendMessage, sendResponse) -> { + if (!sendResponse.isOk()) { + plugin.logger.error(String.format("sendMessage error %d: %s", sendResponse.errorCode(), sendResponse.description())); + } else { + int messageId = sendResponse.message().messageId(); + ONLINE_STATUS_MESSAGE_ID = messageId > 0 ? messageId : ONLINE_STATUS_MESSAGE_ID; + } + } + ); + break; } tgInbound(update.message().from(), msg); - } - else if (update.message().sticker() != null) { + } else if (update.message().sticker() != null) { tgInbound(update.message().from(), "[sticker]"); - } - else if (update.message().photo() != null) { + } else if (update.message().photo() != null) { tgInbound(update.message().from(), "[photo]"); - } - else if (update.message().audio() != null) { + } else if (update.message().audio() != null) { tgInbound(update.message().from(), "[audio]"); - } - else if (update.message().voice() != null) { + } else if (update.message().voice() != null) { tgInbound(update.message().from(), "[voice]"); - } - else if (update.message().document() != null) { + } else if (update.message().document() != null) { tgInbound(update.message().from(), "[document]"); } } - } - catch (Exception e) { + } catch (Exception e) { plugin.logger.error("handling update", e); } } @@ -134,7 +149,7 @@ public class TGBridge { }, (e) -> { plugin.logger.error("getting update", e); plugin.logger.error(String.format("waiting %ds before getting another update", backoffSec)); - try { Thread.sleep(backoffSec * 1000); } catch (InterruptedException ignored) {} + try {Thread.sleep(backoffSec * 1000);} catch (InterruptedException ignored) {} backoffSec *= 2L; if (backoffSec > 3600) { backoffSec = 3600; @@ -142,6 +157,24 @@ public class TGBridge { }); } + private String genOnlineStatus() { + ArrayList out = new ArrayList<>(); + String fmt = server.getAllPlayers().size() > 1 ? "%d players are currently connected to the proxy." : "%d player is currently connected to the proxy."; + out.add(String.format(fmt, server.getAllPlayers().size())); + List registeredServers = new ArrayList<>(server.getAllServers()); + for (RegisteredServer registeredServer : registeredServers) { + List onServer = ImmutableList.copyOf(registeredServer.getPlayersConnected()); + if (!onServer.isEmpty()) { + out.add(String.format("[%s] (%d): %s", + registeredServer.getServerInfo().getName(), + onServer.size(), + onServer.stream().map(Player::getUsername).collect(Collectors.joining(", "))) + ); + } + } + return String.join("\n", out); + } + protected void tgInbound(User user, String content) { inbound(String.format("[tg] <%s> %s", user.lastName() == null ? user.firstName() : String.format("%s %s", user.firstName(), user.lastName()), content)); } @@ -155,6 +188,10 @@ public class TGBridge { } protected void outbound(String content) { + outbound(content, null); + } + + protected void outbound(String content, @Nullable BiConsumer onResponse) { if (bot == null) return; if (content.length() > 4000) { content = content.substring(0, 4000); @@ -162,8 +199,12 @@ public class TGBridge { bot.execute(new SendMessage(CHAT_ID, content), new Callback() { @Override public void onResponse(SendMessage sendMessage, SendResponse sendResponse) { - if (!sendResponse.isOk()) { - plugin.logger.error(String.format("sendMessage error %d: %s", sendResponse.errorCode(), sendResponse.description())); + if (onResponse == null) { + if (!sendResponse.isOk()) { + plugin.logger.error(String.format("sendMessage error %d: %s", sendResponse.errorCode(), sendResponse.description())); + } + } else { + onResponse.accept(sendMessage, sendResponse); } } @@ -179,6 +220,7 @@ public class TGBridge { if (bot == null) return; bot.removeGetUpdatesListener(); bot.shutdown(); + PROXY_SHUT_DOWN = true; } @Subscribe @@ -186,6 +228,7 @@ public class TGBridge { if (event.getPreviousServer().isEmpty()) { if (!event.getPlayer().hasPermission("vtools.globalchat.bypassbridge.join")) { outbound(String.format("%s joined the proxy", event.getPlayer().getUsername())); + updateRequests.add(new UpdateRequest()); } } } @@ -194,6 +237,52 @@ public class TGBridge { public void onDisconnect(DisconnectEvent event) { if (!event.getPlayer().hasPermission("vtools.globalchat.bypassbridge.join")) { outbound(String.format("%s left the proxy", event.getPlayer().getUsername())); + updateRequests.add(new UpdateRequest()); } } + + + private boolean PROXY_SHUT_DOWN = false; + + private static class UpdateRequest {} + + private LinkedBlockingQueue updateRequests = new LinkedBlockingQueue<>(); + + private void initUpdateThread() { + new Thread(() -> { + while (true) { + if (PROXY_SHUT_DOWN) { + setOnlineStatusNotAvailable(); + } + UpdateRequest oldestRequest = null; + try { + oldestRequest = updateRequests.take(); + } catch (InterruptedException ignored) {} + if (oldestRequest == null) continue; + updateRequests.clear(); + if (!updateOnlineStatus()) { + updateRequests.add(oldestRequest); // 更新失败 回去吧您内 + } + } + }).start(); + } + + protected boolean updateOnlineStatus() { + return editOnlineStatusMessage(genOnlineStatus()); + } + + protected boolean setOnlineStatusNotAvailable() { + return editOnlineStatusMessage("(proxy already shutdown)"); + } + + protected boolean editOnlineStatusMessage(String text) { + if (ONLINE_STATUS_MESSAGE_ID < 1) { + return true; + } + BaseResponse response; + try { + response = bot.execute(new EditMessageText(CHAT_ID, ONLINE_STATUS_MESSAGE_ID, text)); + } catch (RuntimeException e) {return false;} + return response != null && response.isOk(); + } }