diff --git a/src/main/java/de/strifel/VTools/listeners/TGBridge.java b/src/main/java/de/strifel/VTools/listeners/TGBridge.java index fe992aa..e5fada4 100644 --- a/src/main/java/de/strifel/VTools/listeners/TGBridge.java +++ b/src/main/java/de/strifel/VTools/listeners/TGBridge.java @@ -4,19 +4,25 @@ import com.google.common.collect.ImmutableList; import com.pengrad.telegrambot.Callback; import com.pengrad.telegrambot.TelegramBot; import com.pengrad.telegrambot.UpdatesListener; +import com.pengrad.telegrambot.model.Message; import com.pengrad.telegrambot.model.Update; import com.pengrad.telegrambot.model.User; +import com.pengrad.telegrambot.request.EditMessageText; import com.pengrad.telegrambot.request.SendMessage; +import com.pengrad.telegrambot.response.BaseResponse; import com.pengrad.telegrambot.response.SendResponse; import com.velocitypowered.api.event.Subscribe; import com.velocitypowered.api.event.connection.DisconnectEvent; import com.velocitypowered.api.event.player.ServerConnectedEvent; +import com.velocitypowered.api.event.player.ServerPostConnectEvent; import com.velocitypowered.api.event.proxy.ProxyShutdownEvent; import com.velocitypowered.api.proxy.Player; import com.velocitypowered.api.proxy.ProxyServer; import com.velocitypowered.api.proxy.server.RegisteredServer; import de.strifel.VTools.VTools; import net.kyori.adventure.text.Component; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.yaml.snakeyaml.Yaml; import java.io.IOException; @@ -27,6 +33,8 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.BiConsumer; import java.util.stream.Collectors; public class TGBridge { @@ -36,11 +44,14 @@ public class TGBridge { private TelegramBot bot; private String TOKEN = ""; + private long BOT_ID = -1; private long CHAT_ID = 0L; + private int ONLINE_STATUS_MESSAGE_ID = -1; + private long backoffSec = 1L; - public TGBridge(VTools plugin) { + public TGBridge(@NotNull VTools plugin) { INSTANCE = this; this.plugin = plugin; this.server = plugin.getServer(); @@ -49,6 +60,7 @@ public class TGBridge { public void register() { server.getEventManager().register(plugin, this); botInit(); + initUpdateThread(); } private void loadConfig() { @@ -67,6 +79,7 @@ public class TGBridge { synchronized (this) { this.CHAT_ID = Long.parseLong(config.getOrDefault("chat_id", "0")); this.TOKEN = config.getOrDefault("token", ""); + this.BOT_ID = Long.parseLong(TOKEN.split(":")[0]); } } catch (Exception e) { plugin.logger.error("parsing config", e); @@ -82,51 +95,54 @@ public class TGBridge { for (Update update : updates) { try { if (update != null && - update.message() != null && - update.message().chat() != null && - update.message().chat().id() == CHAT_ID && - update.message().from() != null + update.message() != null && + update.message().chat() != null && + update.message().chat().id() == CHAT_ID && + update.message().from() != null ) { if (update.message().text() != null && !update.message().text().isEmpty()) { String msg = update.message().text(); - if (msg.equals("/list")) { - ArrayList out = new ArrayList<>(); - String fmt = server.getAllPlayers().size() > 1 ? "%d players are currently connected to the proxy." : "%d player is currently connected to the proxy."; - out.add(String.format(fmt, server.getAllPlayers().size())); - List servers = new ArrayList<>(server.getAllServers()); - for (RegisteredServer server : servers) { - List onServer = ImmutableList.copyOf(server.getPlayersConnected()); - if (!onServer.isEmpty()) { - out.add(String.format("[%s] (%d): %s", - server.getServerInfo().getName(), - onServer.size(), - onServer.stream().map(Player::getUsername).collect(Collectors.joining(", "))) - ); - + switch (msg) { + case "/list": + outbound(genOnlineStatus()); + break; + case "/setpin": + Message replyTo = update.message().replyToMessage(); + if (replyTo == null || replyTo.from() == null || replyTo.from().id() != BOT_ID) { + outbound("must reply a message that from the bot"); + } else { + int messageId = replyTo.messageId(); + ONLINE_STATUS_MESSAGE_ID = messageId > 0 ? messageId : ONLINE_STATUS_MESSAGE_ID; + outbound("done"); } - } - outbound(String.join("\n", out)); + break; + case "/genpin": + outbound(genOnlineStatus(), + (sendMessage, sendResponse) -> { + if (!sendResponse.isOk()) { + plugin.logger.error(String.format("sendMessage error %d: %s", sendResponse.errorCode(), sendResponse.description())); + } else { + int messageId = sendResponse.message().messageId(); + ONLINE_STATUS_MESSAGE_ID = messageId > 0 ? messageId : ONLINE_STATUS_MESSAGE_ID; + } + } + ); + break; } tgInbound(update.message().from(), msg); - } - else if (update.message().sticker() != null) { + } else if (update.message().sticker() != null) { tgInbound(update.message().from(), "[sticker]"); - } - else if (update.message().photo() != null) { + } else if (update.message().photo() != null) { tgInbound(update.message().from(), "[photo]"); - } - else if (update.message().audio() != null) { + } else if (update.message().audio() != null) { tgInbound(update.message().from(), "[audio]"); - } - else if (update.message().voice() != null) { + } else if (update.message().voice() != null) { tgInbound(update.message().from(), "[voice]"); - } - else if (update.message().document() != null) { + } else if (update.message().document() != null) { tgInbound(update.message().from(), "[document]"); } } - } - catch (Exception e) { + } catch (Exception e) { plugin.logger.error("handling update", e); } } @@ -134,7 +150,7 @@ public class TGBridge { }, (e) -> { plugin.logger.error("getting update", e); plugin.logger.error(String.format("waiting %ds before getting another update", backoffSec)); - try { Thread.sleep(backoffSec * 1000); } catch (InterruptedException ignored) {} + try {Thread.sleep(backoffSec * 1000);} catch (InterruptedException ignored) {} backoffSec *= 2L; if (backoffSec > 3600) { backoffSec = 3600; @@ -142,6 +158,24 @@ public class TGBridge { }); } + private String genOnlineStatus() { + ArrayList out = new ArrayList<>(); + String fmt = server.getAllPlayers().size() > 1 ? "%d players are currently connected to the proxy." : "%d player is currently connected to the proxy."; + out.add(String.format(fmt, server.getAllPlayers().size())); + List registeredServers = new ArrayList<>(server.getAllServers()); + for (RegisteredServer registeredServer : registeredServers) { + List onServer = ImmutableList.copyOf(registeredServer.getPlayersConnected()); + if (!onServer.isEmpty()) { + out.add(String.format("[%s] (%d): %s", + registeredServer.getServerInfo().getName(), + onServer.size(), + onServer.stream().map(Player::getUsername).collect(Collectors.joining(", "))) + ); + } + } + return String.join("\n", out); + } + protected void tgInbound(User user, String content) { inbound(String.format("[tg] <%s> %s", user.lastName() == null ? user.firstName() : String.format("%s %s", user.firstName(), user.lastName()), content)); } @@ -155,6 +189,10 @@ public class TGBridge { } protected void outbound(String content) { + outbound(content, null); + } + + protected void outbound(String content, @Nullable BiConsumer onResponse) { if (bot == null) return; if (content.length() > 4000) { content = content.substring(0, 4000); @@ -162,8 +200,12 @@ public class TGBridge { bot.execute(new SendMessage(CHAT_ID, content), new Callback() { @Override public void onResponse(SendMessage sendMessage, SendResponse sendResponse) { - if (!sendResponse.isOk()) { - plugin.logger.error(String.format("sendMessage error %d: %s", sendResponse.errorCode(), sendResponse.description())); + if (onResponse == null) { + if (!sendResponse.isOk()) { + plugin.logger.error(String.format("sendMessage error %d: %s", sendResponse.errorCode(), sendResponse.description())); + } + } else { + onResponse.accept(sendMessage, sendResponse); } } @@ -179,21 +221,195 @@ public class TGBridge { if (bot == null) return; bot.removeGetUpdatesListener(); bot.shutdown(); + PROXY_SHUT_DOWN = true; } @Subscribe public void onServerConnected(ServerConnectedEvent event) { if (event.getPreviousServer().isEmpty()) { if (!event.getPlayer().hasPermission("vtools.globalchat.bypassbridge.join")) { - outbound(String.format("%s joined the proxy", event.getPlayer().getUsername())); + joinLeftAnnounce(String.format("%s joined the proxy", event.getPlayer().getUsername())); } } + updateRequests.add(new UpdateRequest()); } @Subscribe public void onDisconnect(DisconnectEvent event) { if (!event.getPlayer().hasPermission("vtools.globalchat.bypassbridge.join")) { - outbound(String.format("%s left the proxy", event.getPlayer().getUsername())); + joinLeftAnnounce(String.format("%s left the proxy", event.getPlayer().getUsername())); + } + updateRequests.add(new UpdateRequest(2)); + } + + @Subscribe + public void onServerPostConnect(ServerPostConnectEvent event){ + updateRequests.add(new UpdateRequest()); + } + + + + private boolean PROXY_SHUT_DOWN = false; + + private static class UpdateRequest { + int updateTimes; + + UpdateRequest() { + this(1); + } + + UpdateRequest(int updateTimes) { + this.updateTimes = updateTimes; } } + + private LinkedBlockingQueue updateRequests = new LinkedBlockingQueue<>(); + + private void initUpdateThread() { + new Thread(() -> { + while (true) { + if (PROXY_SHUT_DOWN) { + setOnlineStatusNotAvailable(); + return; + } + int maxiterationNum = 0; + UpdateRequest oldestRequest = null; + try { + oldestRequest = updateRequests.take(); + } catch (InterruptedException ignored) {} + if (oldestRequest == null) continue; + maxiterationNum = Math.max(maxiterationNum, oldestRequest.updateTimes - 1); + while (!updateRequests.isEmpty()){ + try { + maxiterationNum = Math.max(maxiterationNum, updateRequests.take().updateTimes - 1); + } catch (InterruptedException ignored) {} + } + + if (!updateOnlineStatus()) { + updateRequests.add(oldestRequest); // 更新失败 回去吧您内 + } + if(maxiterationNum>0){ + updateRequests.add(new UpdateRequest(maxiterationNum)); + } + } + }).start(); + new Thread(()->{ + while (true) { + if (PROXY_SHUT_DOWN) { + return; + } + String oldestMessage = null; + try { + oldestMessage = announceQueue.take(); + } catch (InterruptedException ignored) {} + + if(!currentAnnounce.isValid()){ + currentAnnounce = new JoinLeftAnnounceMessage(oldestMessage); + continue; + } + ArrayList messages = new ArrayList<>(announceQueue.size() + 1); + messages.add(oldestMessage); + while (!announceQueue.isEmpty()) { + try { + messages.add(announceQueue.take()); + } catch (InterruptedException ignored) {} + } + currentAnnounce.addLines(messages); + } + }).start(); + } + + protected boolean updateOnlineStatus() { + return editOnlineStatusMessage(genOnlineStatus()); + } + + protected boolean setOnlineStatusNotAvailable() { + return editOnlineStatusMessage("(proxy already shutdown)"); + } + + protected boolean editOnlineStatusMessage(String text) { + if (ONLINE_STATUS_MESSAGE_ID < 1) { + return true; + } + BaseResponse response; + try { + response = bot.execute(new EditMessageText(CHAT_ID, ONLINE_STATUS_MESSAGE_ID, text)); + } catch (RuntimeException e) {return false;} + return response != null && response.isOk(); + } + + private class JoinLeftAnnounceMessage { + private int messageId; + private long time; + + private StringBuilder text; + + boolean isValid() { + if (messageId < 1) { + return false; + } + long dt = System.currentTimeMillis() - time; + return dt <= 60_000 && dt >= 0; + } + + protected JoinLeftAnnounceMessage(String firstMessage) { + text = new StringBuilder(firstMessage); + time = System.currentTimeMillis(); + sendAnnounceMessage(); + } + + private void sendAnnounceMessage() { + if (bot == null) { + messageId = -1; + return; + } + SendResponse response; + try { + response = bot.execute(new SendMessage(CHAT_ID, text.toString())); + + } catch (RuntimeException e) { + messageId = -1; + return; + } + if(response.isOk() == false){ + messageId = -1; + return; + } + messageId = response.message().messageId(); + } + + protected JoinLeftAnnounceMessage(){ + messageId = 0; + time = 0; + text = new StringBuilder(); + } //dummy + private void addLines(List messages) { + for (String message : messages) { + text.append('\n').append(message); + } + updateAnnounceMessage(); + } + + private void updateAnnounceMessage() { + if(!isValid()){ + plugin.logger.error("message should only push to a valid object"); + return; + } + if (bot == null) { + messageId = -1; + return; + } + try { + bot.execute(new EditMessageText(CHAT_ID, messageId, text.toString())); + } catch (RuntimeException ignored) {} + } + } + + private JoinLeftAnnounceMessage currentAnnounce = new JoinLeftAnnounceMessage(); + private LinkedBlockingQueue announceQueue = new LinkedBlockingQueue<>(); + + private void joinLeftAnnounce(String message){ + announceQueue.add(message); + } + }