398 lines
15 KiB
Python
Executable file
398 lines
15 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
from telethon.sync import TelegramClient
|
|
from telethon import events
|
|
import re
|
|
import logging
|
|
import time
|
|
import sys
|
|
from telethon.tl.functions.messages import GetInlineBotResultsRequest, SendInlineBotResultRequest, \
|
|
SendMessageRequest, SetTypingRequest
|
|
from telethon.tl.types import SendMessageTypingAction, SendMessageCancelAction
|
|
from telethon.tl.types import PeerUser, PeerChat, PeerChannel, User as _User, Chat as _Chat, Channel as _Channel, \
|
|
InputPeerChannel, InputPeerChat, InputPeerUser
|
|
from telethon.errors.rpcbaseerrors import RPCError
|
|
import asyncio
|
|
|
|
from game import Game
|
|
from card import GREY_SET_ID
|
|
|
|
game = Game()
|
|
|
|
from config import api_id, api_hash, PHONE, session_name, unobot_username, unobot_usernames, \
|
|
default_delay, print_cards, disable_all_commands, game_consts
|
|
|
|
game.delay = default_delay
|
|
|
|
SAFE_MODE = False
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
client = TelegramClient(session_name, api_id, api_hash, sequential_updates=True, use_ipv6=False)
|
|
while not client.start(phone=PHONE):
|
|
logger.info("Start failed. Retrying...")
|
|
time.sleep(5)
|
|
logger.info("Started!")
|
|
|
|
my_username = client.get_me().username
|
|
my_firstname = client.get_me().first_name
|
|
|
|
# parse game_consts
|
|
for key in game_consts:
|
|
game_consts[key] = game_consts[key].replace('{username}', my_username)
|
|
game_consts[key] = game_consts[key].replace('{firstname}', my_firstname)
|
|
|
|
logger.info("Getting dialogs")
|
|
for dialog in client.iter_dialogs():
|
|
pass
|
|
logger.info("Done getting dialogs")
|
|
|
|
unobot = client.get_input_entity(unobot_username)
|
|
unochat = None
|
|
|
|
async def inline_query(fail=None):
|
|
""" This part handles interaction with unobot.
|
|
"""
|
|
# typing @unobot and a space :)
|
|
async def set_typing(cancel=False):
|
|
'''let others see you are typing'''
|
|
if cancel:
|
|
try:
|
|
await client(SetTypingRequest(
|
|
peer = unochat,
|
|
action = SendMessageCancelAction()
|
|
))
|
|
except Exception as err:
|
|
print(err)
|
|
else:
|
|
try:
|
|
await client(SetTypingRequest(
|
|
peer = unochat,
|
|
action = SendMessageTypingAction()
|
|
))
|
|
except Exception as err:
|
|
print(err)
|
|
async def typing_sleep(seconds):
|
|
''' Telegram cancels your typing notification in several secs
|
|
we also call set_typing in the beginning
|
|
'''
|
|
INTERVAL = 5
|
|
seconds = int(seconds)
|
|
if seconds <= 0:
|
|
pass
|
|
else:
|
|
if seconds <= INTERVAL:
|
|
await asyncio.sleep(INTERVAL)
|
|
else:
|
|
rounds = int(seconds/INTERVAL)
|
|
for i in range(rounds):
|
|
await asyncio.sleep(INTERVAL)
|
|
await set_typing()
|
|
assert (seconds - rounds * INTERVAL) > 0
|
|
await asyncio.sleep(int(seconds - rounds * INTERVAL))
|
|
await set_typing()
|
|
for tries in range(10):
|
|
try:
|
|
bot_results = await client(GetInlineBotResultsRequest(
|
|
unobot, unochat, '', ''
|
|
))
|
|
except RPCError:
|
|
await asyncio.sleep(1)
|
|
else:
|
|
break
|
|
if bot_results.results:
|
|
query_id = bot_results.query_id
|
|
for result in bot_results.results:
|
|
# is it a grey card? if so, get it.
|
|
try:
|
|
if hasattr(result.document.attributes[1], 'stickerset'):
|
|
try:
|
|
(result_id, anti_cheat) = result.id.split(':')
|
|
except:
|
|
pass
|
|
else:
|
|
if len(result_id) == 36:
|
|
# uuid result for grey cards
|
|
sset = result.document.attributes[1].stickerset
|
|
if str(sset.id) == GREY_SET_ID:
|
|
game.add_grey_card(result.document.id)
|
|
continue
|
|
except (AttributeError, IndexError):
|
|
pass
|
|
except Exception as err:
|
|
logger.exception("while getting grey cards")
|
|
# get ordinary cards
|
|
try:
|
|
(result_id, anti_cheat) = result.id.split(':')
|
|
except ValueError:
|
|
if fail is None:
|
|
fail = 1
|
|
await asyncio.sleep(1)
|
|
await inline_query(fail=fail)
|
|
return
|
|
elif fail < 5:
|
|
fail += 1
|
|
await asyncio.sleep(1)
|
|
await inline_query(fail=fail)
|
|
return
|
|
else:
|
|
return
|
|
game.add_card(result_id, anti_cheat)
|
|
if game.delay:
|
|
await typing_sleep(game.delay)
|
|
if print_cards:
|
|
print(game.print_cards())
|
|
callback_id = game.play_card()
|
|
if not callback_id:
|
|
callback_id = 'draw'
|
|
#await client(SendMessageRequest(unochat, 'Error: No card can be played. Leaving game'))
|
|
print(unochat, 'Error: No card can be played. Leaving game')
|
|
await client(SendMessageRequest(unochat, "/leave@{}".format(unobot_username)))
|
|
return
|
|
for tries in range(6):
|
|
try:
|
|
await client(SendInlineBotResultRequest(
|
|
unochat,
|
|
query_id,
|
|
"{}:{}".format(callback_id, game.anti_cheat)
|
|
))
|
|
except Exception as err:
|
|
logger.critical('Exception: {}'.format(err))
|
|
#await client(SendMessageRequest(unochat, 'Exception: {}'.format(err)))
|
|
else:
|
|
break
|
|
await set_typing(cancel=True)
|
|
game.rotate_deck()
|
|
return True
|
|
else:
|
|
logger.critical('Bad inline result from bot')
|
|
print(bot_results)
|
|
return None
|
|
|
|
|
|
def safety_check(chat_id, force=False):
|
|
if SAFE_MODE or force:
|
|
safe_ids = [-100000000000]
|
|
if chat_id in safe_ids:
|
|
return True
|
|
else:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def commandify(text, my_commands=True, wild_card=True):
|
|
args = text.split()
|
|
if not args:
|
|
return [None]
|
|
match = re.match(r'/([^@]+$)', args[0])
|
|
if match:
|
|
command = match.group(1)
|
|
if not wild_card:
|
|
return [None]
|
|
username = my_username
|
|
args = args[1:]
|
|
return [command, username, args]
|
|
else:
|
|
match = re.match(r'/([^@]+)@([^@]+)$', args[0])
|
|
if match:
|
|
username = match.group(2)
|
|
command = match.group(1)
|
|
args = args[1:]
|
|
if username != my_username and my_commands == True:
|
|
return [None]
|
|
else:
|
|
return [command, username, args]
|
|
else:
|
|
return [None]
|
|
|
|
def get_peer_id(peer, reverse=False):
|
|
'''
|
|
reverse = False:
|
|
peer: telethon.tl.types.PeerUser, PeerChat, PeerChannel / User, Chat, Channel
|
|
return int (-100xxxx)
|
|
reverse = True:
|
|
peer: int (-100xxxx)
|
|
return int (xxxx)
|
|
'''
|
|
if reverse:
|
|
str_peer = str(peer)
|
|
if str_peer.startswith("-100"):
|
|
orig_id = str_peer[4:]
|
|
return orig_id
|
|
else:
|
|
return peer
|
|
else:
|
|
peerid = None
|
|
if type(peer) in (PeerChannel, InputPeerChannel):
|
|
peerid = getattr(peer, 'channel_id')
|
|
peerid = int('-100{}'.format(peerid))
|
|
elif type(peer) in [ _Channel, _Chat]:
|
|
peerid = getattr(peer, 'id')
|
|
peerid = int('-100{}'.format(peerid))
|
|
elif type(peer) in (PeerChat, InputPeerChat):
|
|
peerid = getattr(peer, 'chat_id')
|
|
peerid = int('-100{}'.format(peerid))
|
|
elif type(peer) in (PeerUser, InputPeerUser):
|
|
peerid = getattr(peer, 'user_id')
|
|
elif type(peer) is _User:
|
|
peerid = getattr(peer, 'id')
|
|
else:
|
|
print("Error: ", peer)
|
|
if not peerid:
|
|
print('W: Peer_id is none')
|
|
return peerid
|
|
|
|
|
|
class EmptyChat:
|
|
def __init__(self, title=None):
|
|
self.title = title
|
|
|
|
class EmptyUser:
|
|
def __init__(self, first_name="None", last_name=None, username=None):
|
|
self.first_name = first_name
|
|
self.last_name = last_name
|
|
self.username = username
|
|
|
|
def display_username(user, atuser=False, shorten=False):
|
|
if user.first_name and user.last_name:
|
|
name = "{} {}".format(user.first_name, user.last_name)
|
|
else:
|
|
name = user.first_name
|
|
if shorten:
|
|
return name
|
|
if user.username:
|
|
if atuser:
|
|
name += " (@{})".format(user.username)
|
|
else:
|
|
name += " ({})".format(user.username)
|
|
return name
|
|
|
|
@client.on(events.NewMessage)
|
|
async def new_msg_handler(event):
|
|
global unochat
|
|
global unobot_username, unobot
|
|
msg = event.message
|
|
group = await event.get_chat()
|
|
user = await event.get_sender()
|
|
if event.is_channel and msg.message and (not msg.media):
|
|
# Text handler
|
|
if not safety_check(get_peer_id(msg.to_id)):
|
|
return
|
|
logger.info("Group - {} - {}: {}".format(group.title, display_username(user), msg.message))
|
|
# react to commands
|
|
if not disable_all_commands:
|
|
c = commandify(event.raw_text, wild_card=False)
|
|
if c[0]:
|
|
if c[0] == 'hello':
|
|
await event.reply('hi!')
|
|
if c[0] in ['startgame', 'start', 'join']:
|
|
if game.is_playing:
|
|
await event.reply("I'm playing right now.")
|
|
else:
|
|
unochat = msg.to_id
|
|
# get unobot name from args
|
|
if len(c) == 3 and len(c[2]) >= 1 and c[2][0].endswith('bot'):
|
|
unobot_username = c[2][0]
|
|
unobot = await client.get_input_entity(unobot_username)
|
|
game.join_game(get_peer_id(unochat))
|
|
await client(SendMessageRequest(unochat, "/join@{}".format(unobot_username)))
|
|
elif c[0] in ['stopgame', 'stop', 'leave']:
|
|
if game.is_playing:
|
|
game.leave_game(get_peer_id(unochat))
|
|
game.stop_game()
|
|
await client(SendMessageRequest(unochat, "/leave@{}".format(unobot_username)))
|
|
elif game.joined and get_peer_id(unochat) in game.joined:
|
|
game.leave_game(get_peer_id(unochat))
|
|
await client(SendMessageRequest(unochat, "/leave@{}".format(unobot_username)))
|
|
else:
|
|
await event.reply("I'm not playing right now.")
|
|
elif c[0] in ['wait', 'delay']:
|
|
if game.delay or (not game.is_playing):
|
|
await event.reply("Nothing to do.")
|
|
else:
|
|
game.delay = 8
|
|
await event.reply("OK. {} seconds of delay has been set.".format(game.delay))
|
|
elif c[0] in ['nowait', 'nodelay']:
|
|
if game.delay and game.is_playing:
|
|
myreply = "OK. {} seconds of delay has been removed.".format(game.delay)
|
|
game.delay = None
|
|
await event.reply(myreply)
|
|
else:
|
|
await event.reply("Nothing to do.")
|
|
return
|
|
# react to unobot
|
|
if user.username and user.username in unobot_usernames:
|
|
if re.search(game_consts['myturn'], msg.message):
|
|
if re.search(game_consts['start'], msg.message):
|
|
# I'm the first player
|
|
if not game.is_playing:
|
|
unochat = msg.to_id
|
|
unobot_username = user.username
|
|
unobot = await client.get_input_entity(unobot_username)
|
|
if game.joined and get_peer_id(unochat) in game.joined:
|
|
logger.info('Bot: Game started. I\'m the first player.')
|
|
game.start_game()
|
|
logger.info('Bot: It\'s my turn.')
|
|
if game.joined and get_peer_id(msg.to_id) in game.joined:
|
|
if not game.is_playing:
|
|
logger.info('Bot: Game started a long time ago. I joined midway.')
|
|
unochat = msg.to_id
|
|
unobot_username = user.username
|
|
unobot = await client.get_input_entity(unobot_username)
|
|
game.start_game()
|
|
await inline_query()
|
|
else:
|
|
if not game.is_playing:
|
|
logger.info('I\'m not playing in {} - {}, play anyway.'.format(group.title, get_peer_id(msg.to_id)))
|
|
unochat = msg.to_id
|
|
unobot_username = user.username
|
|
unobot = await client.get_input_entity(unobot_username)
|
|
game.join_game(get_peer_id(unochat))
|
|
game.start_game()
|
|
await inline_query()
|
|
else:
|
|
logger.info('I\'m not playing in {} - {}'.format(group.title, get_peer_id(msg.to_id)))
|
|
await client(SendMessageRequest(msg.to_id, "/leave@{}".format(user.username)))
|
|
elif re.search(game_consts['end'], msg.message):
|
|
if game.is_playing:
|
|
logger.info('Bot: Game ended.')
|
|
game.clear_deck()
|
|
game.leave_game(get_peer_id(unochat))
|
|
game.stop_game()
|
|
game.delay = default_delay
|
|
elif re.search(game_consts['create'], msg.message):
|
|
if not game.is_playing:
|
|
logger.info('Bot: New game created.')
|
|
if default_delay:
|
|
await asyncio.sleep(default_delay)
|
|
unochat = msg.to_id
|
|
unobot_username = user.username
|
|
game.join_game(get_peer_id(unochat))
|
|
await client(SendMessageRequest(unochat, "/join@{}".format(unobot_username)))
|
|
elif re.search(game_consts['start'], msg.message):
|
|
if not game.is_playing:
|
|
unochat = msg.to_id
|
|
unobot_username = user.username
|
|
unobot = await client.get_input_entity(unobot_username)
|
|
if game.joined and get_peer_id(unochat) in game.joined:
|
|
logger.info('Bot: Game started.')
|
|
game.start_game()
|
|
else:
|
|
await client(SendMessageRequest(unochat, "/leave@{}".format(unobot_username)))
|
|
elif re.search(game_consts['win'], msg.message):
|
|
if game.is_playing:
|
|
logger.info('Bot: I win.')
|
|
game.clear_deck()
|
|
game.leave_game(get_peer_id(unochat))
|
|
game.stop_game()
|
|
|
|
|
|
async def main():
|
|
await client.run_until_disconnected()
|
|
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
loop.run_until_complete(main())
|
|
except KeyboardInterrupt:
|
|
client.disconnect()
|