#!/usr/bin/env python3 # -*- coding: utf-8 -*- from telethon.sync import TelegramClient from telethon import events import re import logging import time import sys from telethon.tl.functions.messages import GetInlineBotResultsRequest, SendInlineBotResultRequest, \ SendMessageRequest, SetTypingRequest from telethon.tl.types import SendMessageTypingAction, SendMessageCancelAction from telethon.tl.types import PeerUser, PeerChat, PeerChannel, User as _User, Chat as _Chat, Channel as _Channel, \ InputPeerChannel, InputPeerChat, InputPeerUser from telethon.errors.rpcbaseerrors import RPCError import asyncio from game import Game game = Game() from config import api_id, api_hash, PHONE, session_name, unobot_username, unobot_usernames, \ default_delay, print_cards, disable_all_commands, game_consts game.delay = default_delay SAFE_MODE = False logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) client = TelegramClient(session_name, api_id, api_hash, sequential_updates=True, use_ipv6=False) while not client.start(phone=PHONE): logger.info("Start failed. Retrying...") time.sleep(5) logger.info("Started!") my_username = client.get_me().username my_firstname = client.get_me().first_name # parse game_consts for key in game_consts: game_consts[key] = game_consts[key].replace('{username}', my_username) game_consts[key] = game_consts[key].replace('{firstname}', my_firstname) logger.info("Getting dialogs") for dialog in client.iter_dialogs(): pass logger.info("Done getting dialogs") unobot = client.get_input_entity(unobot_username) unochat = None async def inline_query(fail=None): """ This part handles interaction with unobot. """ # typing @unobot and a space :) async def set_typing(cancel=False): '''let others see you are typing''' if cancel: try: await client(SetTypingRequest( peer = unochat, action = SendMessageCancelAction() )) except Exception as err: print(err) else: try: await client(SetTypingRequest( peer = unochat, action = SendMessageTypingAction() )) except Exception as err: print(err) async def typing_sleep(seconds): ''' Telegram cancels your typing notification in several secs we also call set_typing in the beginning ''' INTERVAL = 5 seconds = int(seconds) if seconds <= 0: pass else: if seconds <= INTERVAL: await asyncio.sleep(INTERVAL) else: rounds = int(seconds/INTERVAL) for i in range(rounds): await asyncio.sleep(INTERVAL) await set_typing() assert (seconds - rounds * INTERVAL) > 0 await asyncio.sleep(int(seconds - rounds * INTERVAL)) await set_typing() for tries in range(10): try: bot_results = await client(GetInlineBotResultsRequest( unobot, unochat, '', '' )) except RPCError: await asyncio.sleep(1) else: break if bot_results.results: query_id = bot_results.query_id for result in bot_results.results: # is it a grey card? if so, get it. try: if hasattr(result.document.attributes[1], 'stickerset'): try: (result_id, anti_cheat) = result.id.split(':') except: pass else: if len(result_id) == 36: # uuid result for grey cards sset = result.document.attributes[1].stickerset game.add_grey_card(sset.id, result.document.id) continue except (AttributeError, IndexError): pass except Exception as err: logger.exception("while getting grey cards") # get ordinary cards try: (result_id, anti_cheat) = result.id.split(':') except ValueError: if fail is None: fail = 1 await asyncio.sleep(1) await inline_query(fail=fail) return elif fail < 5: fail += 1 await asyncio.sleep(1) await inline_query(fail=fail) return else: return game.add_card(result_id, anti_cheat) if game.delay: await typing_sleep(game.delay) if print_cards: print(game.print_cards()) callback_id = game.play_card() if not callback_id: callback_id = 'draw' #await client(SendMessageRequest(unochat, 'Error: No card can be played. Leaving game')) print(unochat, 'Error: No card can be played. Leaving game') await client(SendMessageRequest(unochat, "/leave@{}".format(unobot_username))) return for tries in range(6): try: await client(SendInlineBotResultRequest( unochat, query_id, "{}:{}".format(callback_id, game.anti_cheat) )) except Exception as err: logger.critical('Exception: {}'.format(err)) #await client(SendMessageRequest(unochat, 'Exception: {}'.format(err))) else: break await set_typing(cancel=True) game.rotate_deck() return True else: logger.critical('Bad inline result from bot') print(bot_results) return None def safety_check(chat_id, force=False): if SAFE_MODE or force: safe_ids = [-100000000000] if chat_id in safe_ids: return True else: return False else: return True def commandify(text, my_commands=True, wild_card=True): args = text.split() if not args: return [None] match = re.match(r'/([^@]+$)', args[0]) if match: command = match.group(1) if not wild_card: return [None] username = my_username args = args[1:] return [command, username, args] else: match = re.match(r'/([^@]+)@([^@]+)$', args[0]) if match: username = match.group(2) command = match.group(1) args = args[1:] if username != my_username and my_commands == True: return [None] else: return [command, username, args] else: return [None] def get_peer_id(peer, reverse=False): ''' reverse = False: peer: telethon.tl.types.PeerUser, PeerChat, PeerChannel / User, Chat, Channel return int (-100xxxx) reverse = True: peer: int (-100xxxx) return int (xxxx) ''' if reverse: str_peer = str(peer) if str_peer.startswith("-100"): orig_id = str_peer[4:] return orig_id else: return peer else: peerid = None if type(peer) in (PeerChannel, InputPeerChannel): peerid = getattr(peer, 'channel_id') peerid = int('-100{}'.format(peerid)) elif type(peer) in [ _Channel, _Chat]: peerid = getattr(peer, 'id') peerid = int('-100{}'.format(peerid)) elif type(peer) in (PeerChat, InputPeerChat): peerid = getattr(peer, 'chat_id') peerid = int('-100{}'.format(peerid)) elif type(peer) in (PeerUser, InputPeerUser): peerid = getattr(peer, 'user_id') elif type(peer) is _User: peerid = getattr(peer, 'id') else: print("Error: ", peer) if not peerid: print('W: Peer_id is none') return peerid class EmptyChat: def __init__(self, title=None): self.title = title class EmptyUser: def __init__(self, first_name="None", last_name=None, username=None): self.first_name = first_name self.last_name = last_name self.username = username def display_username(user, atuser=False, shorten=False): if user.first_name and user.last_name: name = "{} {}".format(user.first_name, user.last_name) else: name = user.first_name if shorten: return name if user.username: if atuser: name += " (@{})".format(user.username) else: name += " ({})".format(user.username) return name @client.on(events.NewMessage) async def new_msg_handler(event): global unochat global unobot_username, unobot msg = event.message group = await event.get_chat() user = await event.get_sender() if event.is_channel and msg.message and (not msg.media): # Text handler if not safety_check(get_peer_id(msg.to_id)): return logger.info("Group - {} - {}: {}".format(group.title, display_username(user), msg.message)) # react to commands if not disable_all_commands: c = commandify(event.raw_text, wild_card=False) if c[0]: if c[0] == 'hello': await event.reply('hi!') if c[0] in ['startgame', 'start', 'join']: if game.is_playing: await event.reply("I'm playing right now.") else: unochat = msg.to_id # get unobot name from args if len(c) == 3 and len(c[2]) >= 1 and c[2][0].endswith('bot'): unobot_username = c[2][0] unobot = await client.get_input_entity(unobot_username) game.join_game(get_peer_id(unochat)) await client(SendMessageRequest(unochat, "/join@{}".format(unobot_username))) elif c[0] in ['stopgame', 'stop', 'leave']: if game.is_playing: game.leave_game(get_peer_id(unochat)) game.stop_game() await client(SendMessageRequest(unochat, "/leave@{}".format(unobot_username))) elif game.joined and get_peer_id(unochat) in game.joined: game.leave_game(get_peer_id(unochat)) await client(SendMessageRequest(unochat, "/leave@{}".format(unobot_username))) else: await event.reply("I'm not playing right now.") elif c[0] in ['wait', 'delay']: if game.delay or (not game.is_playing): await event.reply("Nothing to do.") else: game.delay = 8 await event.reply("OK. {} seconds of delay has been set.".format(game.delay)) elif c[0] in ['nowait', 'nodelay']: if game.delay and game.is_playing: myreply = "OK. {} seconds of delay has been removed.".format(game.delay) game.delay = None await event.reply(myreply) else: await event.reply("Nothing to do.") return # react to unobot if user.username and user.username in unobot_usernames: if re.search(game_consts['myturn'], msg.message): if re.search(game_consts['start'], msg.message): # I'm the first player if not game.is_playing: unochat = msg.to_id unobot_username = user.username unobot = await client.get_input_entity(unobot_username) if game.joined and get_peer_id(unochat) in game.joined: logger.info('Bot: Game started. I\'m the first player.') game.start_game() logger.info('Bot: It\'s my turn.') if game.joined and get_peer_id(msg.to_id) in game.joined: if not game.is_playing: logger.info('Bot: Game started a long time ago. I joined midway.') unochat = msg.to_id unobot_username = user.username unobot = await client.get_input_entity(unobot_username) game.start_game() await inline_query() else: if not game.is_playing: logger.info('I\'m not playing in {} - {}, play anyway.'.format(group.title, get_peer_id(msg.to_id))) unochat = msg.to_id unobot_username = user.username unobot = await client.get_input_entity(unobot_username) game.join_game(get_peer_id(unochat)) game.start_game() await inline_query() else: logger.info('I\'m not playing in {} - {}'.format(group.title, get_peer_id(msg.to_id))) await client(SendMessageRequest(msg.to_id, "/leave@{}".format(user.username))) elif re.search(game_consts['end'], msg.message): if game.is_playing: logger.info('Bot: Game ended.') game.clear_deck() game.leave_game(get_peer_id(unochat)) game.stop_game() game.delay = default_delay elif re.search(game_consts['create'], msg.message): if not game.is_playing: logger.info('Bot: New game created.') if default_delay: await asyncio.sleep(default_delay) unochat = msg.to_id unobot_username = user.username game.join_game(get_peer_id(unochat)) await client(SendMessageRequest(unochat, "/join@{}".format(unobot_username))) elif re.search(game_consts['start'], msg.message): if not game.is_playing: unochat = msg.to_id unobot_username = user.username unobot = await client.get_input_entity(unobot_username) if game.joined and get_peer_id(unochat) in game.joined: logger.info('Bot: Game started.') game.start_game() else: await client(SendMessageRequest(unochat, "/leave@{}".format(unobot_username))) elif re.search(game_consts['win'], msg.message): if game.is_playing: logger.info('Bot: I win.') game.clear_deck() game.leave_game(get_peer_id(unochat)) game.stop_game() async def main(): await client.run_until_disconnected() loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) except KeyboardInterrupt: client.disconnect()