From 6204868a18dc17426928b98b2a45004da7180c72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Thu, 19 May 2016 20:52:50 +0200 Subject: [PATCH] separate game logic from bot interface, introduce exceptions instead of boolean returns, remove repetitive code, begin unit tests, improve docstrings, update to python-telegram-bot==4.1.1, add ponyorm settings classes (unused) --- bot.py | 500 ++++++++++++++++++++++---------------- card.py | 10 +- chat_setting.py | 20 ++ database.py | 23 ++ deck.py | 21 +- errors.py | 37 +++ game.py | 15 +- game_manager.py | 115 +++++---- player.py | 30 ++- results.py | 21 +- test/test.py | 41 ---- test/test_game_manager.py | 73 ++++++ test/test_player.py | 158 ++++++++++++ user_setting.py | 30 +++ 14 files changed, 755 insertions(+), 339 deletions(-) create mode 100644 chat_setting.py create mode 100644 database.py create mode 100644 errors.py delete mode 100644 test/test.py create mode 100644 test/test_game_manager.py create mode 100644 test/test_player.py create mode 100644 user_setting.py diff --git a/bot.py b/bot.py index 1fffc37..2f87297 100644 --- a/bot.py +++ b/bot.py @@ -32,8 +32,16 @@ from telegram.utils.botan import Botan from game_manager import GameManager from credentials import TOKEN, BOTAN_TOKEN from start_bot import start_bot -from results import * -from utils import * +from results import (add_call_bluff, add_choose_color, add_draw, add_gameinfo, + add_no_game, add_not_started, add_other_cards, add_pass, + add_card) +from utils import display_name +import card as c +from errors import (NoGameInChatError, LobbyClosedError, AlreadyJoinedError, + NotEnoughPlayersError, DeckEmptyError) +from database import db_session + +TIMEOUT = 2.5 logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', @@ -83,8 +91,9 @@ source_text = ("This bot is Free Software and licensed under the AGPL. " @run_async def send_async(bot, *args, **kwargs): + """Send a message asynchronously""" if 'timeout' not in kwargs: - kwargs['timeout'] = 2.5 + kwargs['timeout'] = TIMEOUT try: bot.sendMessage(*args, **kwargs) @@ -94,8 +103,9 @@ def send_async(bot, *args, **kwargs): @run_async def answer_async(bot, *args, **kwargs): + """Answer an inline query asynchronously""" if 'timeout' not in kwargs: - kwargs['timeout'] = 2.5 + kwargs['timeout'] = TIMEOUT try: bot.answerInlineQuery(*args, **kwargs) @@ -104,89 +114,97 @@ def answer_async(bot, *args, **kwargs): def error(bot, update, error): - """ Simple error handler """ + """Simple error handler""" logger.exception(error) def new_game(bot, update): - """ Handler for the /new command """ + """Handler for the /new command""" chat_id = update.message.chat_id + if update.message.chat.type == 'private': help(bot, update) + else: game = gm.new_game(update.message.chat) game.owner = update.message.from_user send_async(bot, chat_id, text="Created a new game! Join the game with /join " "and start the game with /start") + if botan: botan.track(update.message, 'New games') def join_game(bot, update): - """ Handler for the /join command """ - chat_id = update.message.chat_id + """Handler for the /join command""" + chat = update.message.chat + if update.message.chat.type == 'private': help(bot, update) - else: - try: - game = gm.chatid_games[chat_id][-1] - if not game.open: - send_async(bot, chat_id, text="The lobby is closed") - return - except (KeyError, IndexError): - pass + return - joined = gm.join_game(chat_id, update.message.from_user) - if joined: - send_async(bot, chat_id, - text="Joined the game", - reply_to_message_id=update.message.message_id) - elif joined is None: - send_async(bot, chat_id, - text="No game is running at the moment. " - "Create a new game with /new", - reply_to_message_id=update.message.message_id) - else: - send_async(bot, chat_id, - text="You already joined the game. Start the game " - "with /start", - reply_to_message_id=update.message.message_id) + try: + gm.join_game(update.message.from_user, chat) + + except LobbyClosedError: + send_async(bot, chat.id, text="The lobby is closed") + + except NoGameInChatError: + send_async(bot, chat.id, + text="No game is running at the moment. " + "Create a new game with /new", + reply_to_message_id=update.message.message_id) + + except AlreadyJoinedError: + send_async(bot, chat.id, + text="You already joined the game. Start the game " + "with /start", + reply_to_message_id=update.message.message_id) + else: + send_async(bot, chat.id, + text="Joined the game", + reply_to_message_id=update.message.message_id) def leave_game(bot, update): - """ Handler for the /leave command """ - chat_id = update.message.chat_id + """Handler for the /leave command""" + chat = update.message.chat user = update.message.from_user - players = gm.userid_players.get(user.id, list()) - for player in players: - if player.game.chat.id == chat_id: - game = player.game - break - else: - send_async(bot, chat_id, text="You are not playing in a game in " + + player = gm.player_for_user_in_chat(user, chat) + + if player is None: + send_async(bot, chat.id, text="You are not playing in a game in " "this group.", reply_to_message_id=update.message.message_id) return + game = player.game user = update.message.from_user - if len(game.players) < 3: - gm.end_game(chat_id, user) - send_async(bot, chat_id, text="Game ended!") + try: + gm.leave_game(user, chat) + + except NoGameInChatError: + send_async(bot, chat.id, text="You are not playing in a game in " + "this group.", + reply_to_message_id=update.message.message_id) + + except NotEnoughPlayersError: + gm.end_game(chat, user) + send_async(bot, chat.id, text="Game ended!") + else: - if gm.leave_game(user, chat_id): - send_async(bot, chat_id, - text="Okay. Next Player: " + - display_name(game.current_player.user), - reply_to_message_id=update.message.message_id) - else: - send_async(bot, chat_id, text="You are not playing in a game in " - "this group.", - reply_to_message_id=update.message.message_id) + send_async(bot, chat.id, + text="Okay. Next Player: " + + display_name(game.current_player.user), + reply_to_message_id=update.message.message_id) +@run_async def select_game(bot, update): + """Handler for callback queries to select the current game""" chat_id = int(update.callback_query.data) user_id = update.callback_query.from_user.id @@ -196,8 +214,9 @@ def select_game(bot, update): gm.userid_current[user_id] = player break else: - send_async(bot, update.callback_query.message.chat_id, - text="Game not found :(") + bot.sendMessage(update.callback_query.message.chat_id, + text="Game not found.", + timeout=TIMEOUT) return back = [[InlineKeyboardButton(text='Back to last group', @@ -206,7 +225,8 @@ def select_game(bot, update): bot.answerCallbackQuery(update.callback_query.id, text="Please switch to the group you selected!", show_alert=False, - timeout=2.5) + timeout=TIMEOUT) + bot.editMessageText(chat_id=update.callback_query.message.chat_id, message_id=update.callback_query.message.message_id, text="Selected group: %s\n" @@ -215,87 +235,115 @@ def select_game(bot, update): % gm.userid_current[user_id].game.chat.title, reply_markup=InlineKeyboardMarkup(back), parse_mode=ParseMode.HTML, - timeout=2.5) + timeout=TIMEOUT) def status_update(bot, update): - """ Remove player from game if user leaves the group """ + """Remove player from game if user leaves the group""" + chat = update.message.chat if update.message.left_chat_member: try: - chat_id = update.message.chat_id user = update.message.left_chat_member except KeyError: return - if gm.leave_game(user, chat_id): - send_async(bot, chat_id, text="Removing %s from the game" + try: + gm.leave_game(user, chat) + except NoGameInChatError: + pass + except NotEnoughPlayersError: + gm.end_game(chat, user) + send_async(bot, chat.id, text="Game ended!") + else: + send_async(bot, chat.id, text="Removing %s from the game" % display_name(user)) def start_game(bot, update, args): - """ Handler for the /start command """ + """Handler for the /start command""" if update.message.chat.type != 'private': - # Show the first card - chat_id = update.message.chat_id + chat = update.message.chat + try: - game = gm.chatid_games[chat_id][-1] + game = gm.chatid_games[chat.id][-1] except (KeyError, IndexError): - send_async(bot, chat_id, text="There is no game running in this " + send_async(bot, chat.id, text="There is no game running in this " "chat. Create a new one with /new") return - if game.current_player is None or \ - game.current_player is game.current_player.next: - send_async(bot, chat_id, text="At least two players must /join " + if game.started: + send_async(bot, chat.id, text="The game has already started") + + elif len(game.players) < 2: + send_async(bot, chat.id, text="At least two players must /join " "the game before you can start it") - elif game.started: - send_async(bot, chat_id, text="The game has already started") + else: game.play_card(game.last_card) game.started = True - bot.sendSticker(chat_id, - sticker=c.STICKERS[str(game.last_card)], - timeout=2.5) - send_async(bot, chat_id, - text="First player: %s\n" - "Use /close to stop people from joining the game." - % display_name(game.current_player.user)) + + @run_async + def send_first(): + """Send the first card and player""" + + bot.sendSticker(chat.id, + sticker=c.STICKERS[str(game.last_card)], + timeout=TIMEOUT) + + bot.sendMessage(chat.id, + text="First player: %s\n" + "Use /close to stop people from joining " + "the game." + % display_name(game.current_player.user), + timeout=TIMEOUT) + + send_first() + elif len(args) and args[0] == 'select': players = gm.userid_players[update.message.from_user.id] groups = list() for player in players: - groups.append([InlineKeyboardButton(text=player.game.chat.title, - callback_data= - str(player.game.chat.id))]) + title = player.game.chat.title + + if player is gm.userid_current[update.message.from_user.id]: + title = '- %s -' % player.game.chat.title + + groups.append( + [InlineKeyboardButton(text=title, + callback_data=str(player.game.chat.id))] + ) + send_async(bot, update.message.chat_id, - text='Please select the group you want to play in. ', + text='Please select the group you want to play in.', reply_markup=InlineKeyboardMarkup(groups)) + else: help(bot, update) def close_game(bot, update): - """ Handler for the /close command """ - chat_id = update.message.chat_id + """Handler for the /close command""" + chat = update.message.chat user = update.message.from_user - games = gm.chatid_games.get(chat_id) + games = gm.chatid_games.get(chat.id) if not games: - send_async(bot, chat_id, text="There is no running game") + send_async(bot, chat.id, text="There is no running game in this chat.") return game = games[-1] if game.owner.id == user.id: game.open = False - send_async(bot, chat_id, text="Closed the lobby. " + send_async(bot, chat.id, text="Closed the lobby. " "No more players can join this game.") return + else: - send_async(bot, chat_id, + send_async(bot, chat.id, text="Only the game creator (%s) can do that" % game.owner.first_name, reply_to_message_id=update.message.message_id) @@ -303,118 +351,115 @@ def close_game(bot, update): def open_game(bot, update): - """ Handler for the /open command """ - chat_id = update.message.chat_id + """Handler for the /open command""" + chat = update.message.chat user = update.message.from_user - games = gm.chatid_games.get(chat_id) + games = gm.chatid_games.get(chat.id) if not games: - send_async(bot, chat_id, text="There is no running game") + send_async(bot, chat.id, text="There is no running game in this chat.") return game = games[-1] if game.owner.id == user.id: game.open = True - send_async(bot, chat_id, text="Opened the lobby. " + send_async(bot, chat.id, text="Opened the lobby. " "New players may /join the game.") return else: - send_async(bot, chat_id, - text="Only the game creator (%s) can do that" + send_async(bot, chat.id, + text="Only the game creator (%s) can do that." % game.owner.first_name, reply_to_message_id=update.message.message_id) return def skip_player(bot, update): - """ Handler for the /skip command """ - chat_id = update.message.chat_id + """Handler for the /skip command""" + chat = update.message.chat user = update.message.from_user - games = gm.chatid_games.get(chat_id) - players = gm.userid_players.get(user.id) - if not games: - send_async(bot, chat_id, text="There is no running game") + player = gm.player_for_user_in_chat(user, chat) + if not player: + send_async(bot, chat.id, text="You are not playing in a game in this " + "chat.") return - if not players: - send_async(bot, chat_id, text="You are not playing") - return + game = player.game + skipped_player = game.current_player + next_player = game.current_player.next - for game in games: - for player in players: - if player in game.players: - started = game.current_player.turn_started - now = datetime.now() - delta = (now - started).seconds + started = skipped_player.turn_started + now = datetime.now() + delta = (now - started).seconds - if delta < game.current_player.waiting_time: - send_async(bot, chat_id, - text="Please wait %d seconds" - % (game.current_player.waiting_time - - delta), - reply_to_message_id= - update.message.message_id) - return + if delta < skipped_player.waiting_time: + send_async(bot, chat.id, + text="Please wait %d seconds" + % (skipped_player.waiting_time - delta), + reply_to_message_id=update.message.message_id) - elif game.current_player.waiting_time > 0: - game.current_player.anti_cheat += 1 - game.current_player.waiting_time -= 30 - game.current_player.cards.append(game.deck.draw()) - send_async(bot, chat_id, - text="Waiting time to skip this player has " - "been reduced to %d seconds.\n" - "Next player: %s" - % (game.current_player.waiting_time, - display_name( - game.current_player.next.user))) - game.turn() - return + elif skipped_player.waiting_time > 0: + skipped_player.anti_cheat += 1 + skipped_player.waiting_time -= 30 + try: + skipped_player.draw() + except DeckEmptyError: + pass - elif len(game.players) > 2: - send_async(bot, chat_id, - text="%s was skipped four times in a row " - "and has been removed from the game.\n" - "Next player: %s" - % (display_name(game.current_player.user), - display_name( - game.current_player.next.user))) + send_async(bot, chat.id, + text="Waiting time to skip this player has " + "been reduced to %d seconds.\n" + "Next player: %s" + % (skipped_player.waiting_time, + display_name(next_player.user))) + game.turn() - gm.leave_game(game.current_player.user, chat_id) - return - else: - send_async(bot, chat_id, - text="%s was skipped four times in a row " - "and has been removed from the game.\n" - "The game ended." - % display_name(game.current_player.user)) + else: + try: + gm.leave_game(skipped_player.user, chat) + send_async(bot, chat.id, + text="%s was skipped four times in a row " + "and has been removed from the game.\n" + "Next player: %s" + % (display_name(skipped_player.user), + display_name(next_player.user))) - gm.end_game(chat_id, game.current_player.user) - return + except NotEnoughPlayersError: + send_async(bot, chat.id, + text="%s was skipped four times in a row " + "and has been removed from the game.\n" + "The game ended." + % display_name(skipped_player.user)) + + gm.end_game(chat.id, skipped_player.user) def help(bot, update): - """ Handler for the /help command """ + """Handler for the /help command""" send_async(bot, update.message.chat_id, text=help_text, parse_mode=ParseMode.HTML, disable_web_page_preview=True) def source(bot, update): - """ Handler for the /help command """ + """Handler for the /help command""" send_async(bot, update.message.chat_id, text=source_text, parse_mode=ParseMode.HTML, disable_web_page_preview=True) def news(bot, update): - """ Handler for the /news command """ + """Handler for the /news command""" send_async(bot, update.message.chat_id, text="All news here: https://telegram.me/unobotupdates", disable_web_page_preview=True) def reply_to_query(bot, update): - """ Builds the result list for inline queries and answers to the client """ + """ + Handler for inline queries. + Builds the result list for inline queries and answers to the client. + """ results = list() playable = list() switch = None @@ -429,9 +474,11 @@ def reply_to_query(bot, update): else: if not game.started: add_not_started(results) + elif user_id == game.current_player.user.id: if game.choosing_color: add_choose_color(results) + add_other_cards(playable, player, results, game) else: if not player.drew: add_draw(player, results) @@ -443,19 +490,18 @@ def reply_to_query(bot, update): add_call_bluff(results) playable = player.playable_cards() - added_ids = list() + added_ids = list() # Duplicates are not allowed for card in sorted(player.cards): - add_play_card(game, card, results, - can_play=(card in playable and + add_card(game, card, results, + can_play=(card in playable and str(card) not in added_ids)) added_ids.append(str(card)) - if False or game.choosing_color: - add_other_cards(playable, player, results, game) elif user_id != game.current_player.user.id or not game.started: for card in sorted(player.cards): - add_play_card(game, card, results, can_play=False) + add_card(game, card, results, can_play=False) + else: add_gameinfo(game, results) @@ -470,13 +516,16 @@ def reply_to_query(bot, update): def process_result(bot, update): - """ Check the players actions and act accordingly """ + """ + Handler for chosen inline results. + Checks the players actions and acts accordingly. + """ try: user = update.chosen_inline_result.from_user player = gm.userid_current[user.id] game = player.game result_id = update.chosen_inline_result.result_id - chat_id = game.chat.id + chat = game.chat except KeyError: return @@ -491,103 +540,130 @@ def process_result(bot, update): elif len(result_id) == 36: # UUID result return elif int(anti_cheat) != last_anti_cheat: - send_async(bot, chat_id, + send_async(bot, chat.id, text="Cheat attempt by %s" % display_name(player.user)) return elif result_id == 'call_bluff': - reset_waiting_time(bot, chat_id, player) - do_call_bluff(bot, chat_id, game, player) + reset_waiting_time(bot, player) + do_call_bluff(bot, player) elif result_id == 'draw': - reset_waiting_time(bot, chat_id, player) - do_draw(game, player) + reset_waiting_time(bot, player) + do_draw(player) elif result_id == 'pass': game.turn() elif result_id in c.COLORS: game.choose_color(result_id) else: - reset_waiting_time(bot, chat_id, player) - do_play_card(bot, chat_id, game, player, result_id, user) + reset_waiting_time(bot, player) + do_play_card(bot, player, result_id) - if game in gm.chatid_games.get(chat_id, list()): - send_async(bot, chat_id, text="Next player: " + + if game in gm.chatid_games.get(chat.id, list()): + send_async(bot, chat.id, text="Next player: " + display_name(game.current_player.user)) -def reset_waiting_time(bot, chat_id, player): +def reset_waiting_time(bot, player): + """Resets waiting time for a player and sends a notice to the group""" + chat = player.game.chat + if player.waiting_time < 90: player.waiting_time = 90 - send_async(bot, chat_id, text="Waiting time for %s has been reset to " + send_async(bot, chat.id, text="Waiting time for %s has been reset to " "90 seconds" % display_name(player.user)) -def do_play_card(bot, chat_id, game, player, result_id, user): +def do_play_card(bot, player, result_id): + """Plays the selected card and sends an update to the group if needed""" card = c.from_str(result_id) - game.play_card(card) - player.cards.remove(card) + player.play(card) + game = player.game + chat = game.chat + user = player.user + if game.choosing_color: - send_async(bot, chat_id, text="Please choose a color") + send_async(bot, chat.id, text="Please choose a color") + if len(player.cards) == 1: - send_async(bot, chat_id, text="UNO!") + send_async(bot, chat.id, text="UNO!") + if len(player.cards) == 0: - send_async(bot, chat_id, text="%s won!" % user.first_name) - if len(game.players) < 3: - send_async(bot, chat_id, text="Game ended!") - gm.end_game(chat_id, user) - else: - gm.leave_game(user, chat_id) + send_async(bot, chat.id, text="%s won!" % user.first_name) + try: + gm.leave_game(user, chat) + except NotEnoughPlayersError: + send_async(bot, chat.id, text="Game ended!") + gm.end_game(chat, user) if botan: botan.track(Message(randint(1, 1000000000), user, datetime.now(), - Chat(chat_id, 'group')), + Chat(chat.id, 'group')), 'Played cards') -def do_draw(game, player): +def do_draw(bot, player): + """Does the drawing""" + game = player.game draw_counter_before = game.draw_counter - for n in range(game.draw_counter or 1): - player.cards.append(game.deck.draw()) - game.draw_counter = 0 - player.drew = True + + try: + player.draw() + except DeckEmptyError: + send_async(bot, player.game.chat.id, + text="There are no more cards in the deck.") + if (game.last_card.value == c.DRAW_TWO or game.last_card.special == c.DRAW_FOUR) and \ draw_counter_before > 0: game.turn() -def do_call_bluff(bot, chat_id, game, player): +def do_call_bluff(bot, player): + """Handles the bluff calling""" + game = player.game + chat = game.chat + if player.prev.bluffing: - send_async(bot, chat_id, text="Bluff called! Giving %d cards to %s" + send_async(bot, chat.id, text="Bluff called! Giving %d cards to %s" % (game.draw_counter, player.prev.user.first_name)) - for i in range(game.draw_counter): - player.prev.cards.append(game.deck.draw()) + + try: + player.prev.draw() + except DeckEmptyError: + send_async(bot, player.game.chat.id, + text="There are no more cards in the deck.") + else: - send_async(bot, chat_id, text="%s didn't bluff! Giving %d cards to %s" + game.draw_counter += 2 + send_async(bot, chat.id, text="%s didn't bluff! Giving %d cards to %s" % (player.prev.user.first_name, - game.draw_counter + 2, + game.draw_counter, player.user.first_name)) - for i in range(game.draw_counter + 2): - player.cards.append(game.deck.draw()) - game.draw_counter = 0 + try: + player.draw() + except DeckEmptyError: + send_async(bot, player.game.chat.id, + text="There are no more cards in the deck.") + game.turn() # Add all handlers to the dispatcher and run the bot -dp.addHandler(InlineQueryHandler(reply_to_query)) -dp.addHandler(ChosenInlineResultHandler(process_result)) -dp.addHandler(CallbackQueryHandler(select_game)) -dp.addHandler(CommandHandler('start', start_game, pass_args=True)) -dp.addHandler(CommandHandler('new', new_game)) -dp.addHandler(CommandHandler('join', join_game)) -dp.addHandler(CommandHandler('leave', leave_game)) -dp.addHandler(CommandHandler('open', open_game)) -dp.addHandler(CommandHandler('close', close_game)) -dp.addHandler(CommandHandler('skip', skip_player)) -dp.addHandler(CommandHandler('help', help)) -dp.addHandler(CommandHandler('source', source)) -dp.addHandler(CommandHandler('news', news)) -dp.addHandler(MessageHandler([Filters.status_update], status_update)) -dp.addErrorHandler(error) +dp.add_handler(InlineQueryHandler(reply_to_query)) +dp.add_handler(ChosenInlineResultHandler(process_result)) +dp.add_handler(CallbackQueryHandler(select_game)) +dp.add_handler(CommandHandler('start', start_game, pass_args=True)) +dp.add_handler(CommandHandler('new', new_game)) +dp.add_handler(CommandHandler('join', join_game)) +dp.add_handler(CommandHandler('leave', leave_game)) +dp.add_handler(CommandHandler('open', open_game)) +dp.add_handler(CommandHandler('close', close_game)) +dp.add_handler(CommandHandler('skip', skip_player)) +dp.add_handler(CommandHandler('help', help)) +dp.add_handler(CommandHandler('source', source)) +dp.add_handler(CommandHandler('news', news)) +dp.add_handler(MessageHandler([Filters.status_update], status_update)) +dp.add_error_handler(error) start_bot(u) u.idle() diff --git a/card.py b/card.py index aa3b303..a291e45 100644 --- a/card.py +++ b/card.py @@ -180,9 +180,7 @@ STICKERS_GREY = { class Card(object): - """ - This class represents a card. - """ + """This class represents an UNO card""" def __init__(self, color, value, special=None): self.color = color @@ -205,16 +203,16 @@ class Card(object): return '%s%s' % (COLOR_ICONS[self.color], self.value.capitalize()) def __eq__(self, other): - """ Needed for sorting the cards """ + """Needed for sorting the cards""" return str(self) == str(other) def __lt__(self, other): - """ Needed for sorting the cards """ + """Needed for sorting the cards""" return str(self) < str(other) def from_str(string): - """ Decode a Card object from a string """ + """Decodes a Card object from a string""" if string not in SPECIALS: color, value = string.split('_') return Card(color, value) diff --git a/chat_setting.py b/chat_setting.py new file mode 100644 index 0000000..baccfee --- /dev/null +++ b/chat_setting.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python3 +# +# Telegram bot to play UNO in group chats +# Copyright (c) 2016 Jannes Höke +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + + +pass diff --git a/database.py b/database.py new file mode 100644 index 0000000..30b2e54 --- /dev/null +++ b/database.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +# +# Telegram bot to play UNO in group chats +# Copyright (c) 2016 Jannes Höke +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + + +from pony.orm import Database, db_session, Optional, Required, Set, PrimaryKey + +# Database singleton +db = Database() diff --git a/deck.py b/deck.py index 0d3b82c..0fda198 100644 --- a/deck.py +++ b/deck.py @@ -18,9 +18,11 @@ from random import shuffle +import logging + import card as c from card import Card -import logging +from errors import DeckEmptyError class Deck(object): @@ -45,22 +47,25 @@ class Deck(object): self.shuffle() def shuffle(self): - """ Shuffle the deck """ + """Shuffles the deck""" self.logger.debug("Shuffling Deck") shuffle(self.cards) def draw(self): - """ Draw a card from this deck """ + """Draws a card from this deck""" try: card = self.cards.pop() self.logger.debug("Drawing card " + str(card)) return card except IndexError: - while len(self.graveyard): - self.cards.append(self.graveyard.pop()) - self.shuffle() - return self.draw() + if len(self.graveyard): + while len(self.graveyard): + self.cards.append(self.graveyard.pop()) + self.shuffle() + return self.draw() + else: + raise DeckEmptyError() def dismiss(self, card): - """ All played cards should be returned into the deck """ + """Returns a card to the deck""" self.graveyard.append(card) diff --git a/errors.py b/errors.py new file mode 100644 index 0000000..ac0575a --- /dev/null +++ b/errors.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +# +# Telegram bot to play UNO in group chats +# Copyright (c) 2016 Jannes Höke +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + + +class NoGameInChatError(Exception): + pass + + +class AlreadyJoinedError(Exception): + pass + + +class LobbyClosedError(Exception): + pass + + +class NotEnoughPlayersError(Exception): + pass + + +class DeckEmptyError(Exception): + pass diff --git a/game.py b/game.py index 0e4e632..169e4fc 100644 --- a/game.py +++ b/game.py @@ -48,6 +48,7 @@ class Game(object): @property def players(self): + """Returns a list of all players in this game""" players = list() if not self.current_player: return players @@ -61,18 +62,23 @@ class Game(object): return players def reverse(self): - """ Reverse the direction of play """ + """Reverses the direction of game""" self.reversed = not self.reversed def turn(self): - """ Mark the turn as over and change the current player """ + """Marks the turn as over and change the current player""" self.logger.debug("Next Player") self.current_player = self.current_player.next self.current_player.drew = False self.current_player.turn_started = datetime.now() + self.choosing_color = False def play_card(self, card): - """ Play a card and trigger its effects """ + """ + Plays a card and triggers its effects. + Should be called only from Player.play or on game start to play the + first card + """ self.deck.dismiss(self.last_card) self.last_card = card @@ -100,7 +106,6 @@ class Game(object): self.choosing_color = True def choose_color(self, color): - """ Carries out the color choosing and turns the game """ + """Carries out the color choosing and turns the game""" self.last_card.color = color self.turn() - self.choosing_color = False diff --git a/game_manager.py b/game_manager.py index 4e3ca4b..1f87f28 100644 --- a/game_manager.py +++ b/game_manager.py @@ -21,6 +21,8 @@ import logging from game import Game from player import Player +from errors import (AlreadyJoinedError, LobbyClosedError, NoGameInChatError, + NotEnoughPlayersError) class GameManager(object): @@ -38,7 +40,7 @@ class GameManager(object): """ chat_id = chat.id - self.logger.info("Creating new game with id " + str(chat_id)) + self.logger.debug("Creating new game in chat " + str(chat_id)) game = Game(chat) if chat_id not in self.chatid_games: @@ -47,13 +49,17 @@ class GameManager(object): self.chatid_games[chat_id].append(game) return game - def join_game(self, chat_id, user): + def join_game(self, user, chat): """ Create a player from the Telegram user and add it to the game """ - self.logger.info("Joining game with id " + str(chat_id)) + self.logger.info("Joining game with id " + str(chat.id)) + try: - game = self.chatid_games[chat_id][-1] + game = self.chatid_games[chat.id][-1] except (KeyError, IndexError): - return None + raise NoGameInChatError() + + if not game.open: + raise LobbyClosedError() if user.id not in self.userid_players: self.userid_players[user.id] = list() @@ -61,76 +67,81 @@ class GameManager(object): players = self.userid_players[user.id] # Don not re-add a player and remove the player from previous games in - # this chat + # this chat, if he is in one of them for player in players: if player in game.players: - return False + raise AlreadyJoinedError() else: - self.leave_game(user, chat_id) + try: + self.leave_game(user, chat) + except NoGameInChatError: + pass player = Player(game, user) players.append(player) self.userid_current[user.id] = player - return True - def leave_game(self, user, chat_id): + def leave_game(self, user, chat): """ Remove a player from its current game """ - try: - players = self.userid_players[user.id] - games = self.chatid_games[chat_id] - for player in players: - for game in games: - if player in game.players: - if player is game.current_player: - game.turn() + player = self.player_for_user_in_chat(user, chat) + players = self.userid_players.get(user.id, list()) - player.leave() - players.remove(player) + if not player: + raise NoGameInChatError - # If this is the selected game, switch to another - if self.userid_current[user.id] is player: - if len(players): - self.userid_current[user.id] = players[0] - else: - del self.userid_current[user.id] - return True + game = player.game + + if len(game.players) < 3: + raise NotEnoughPlayersError() + + if player is game.current_player: + game.turn() + + player.leave() + players.remove(player) + + # If this is the selected game, switch to another + if self.userid_current.get(user.id, None) is player: + if players: + self.userid_current[user.id] = players[0] else: - return False + del self.userid_current[user.id] + del self.userid_players[user.id] - except KeyError: - return False - - def end_game(self, chat_id, user): + def end_game(self, chat, user): """ End a game """ - self.logger.info("Game in chat " + str(chat_id) + " ended") - players = self.userid_players[user.id] - games = self.chatid_games[chat_id] - the_game = None + self.logger.info("Game in chat " + str(chat.id) + " ended") # Find the correct game instance to end - for player in players: - for game in games: - if player in game.players: - the_game = game - break - if the_game: - break - else: - return + player = self.player_for_user_in_chat(user, chat) - for player in the_game.players: + if not player: + raise NoGameInChatError + + game = player.game + + # Clear game + for player_in_game in game.players: this_users_players = self.userid_players[player.user.id] - this_users_players.remove(player) - if len(this_users_players) is 0: + this_users_players.remove(player_in_game) + + if this_users_players: + self.userid_current[player.user.id] = this_users_players[0] + else: del self.userid_players[player.user.id] del self.userid_current[player.user.id] - else: - self.userid_current[player.user.id] = this_users_players[0] - self.chatid_games[chat_id].remove(the_game) - return + self.chatid_games[chat.id].remove(game) + + def player_for_user_in_chat(self, user, chat): + players = self.userid_players.get(user.id, list()) + for player in players: + if player.game.chat.id == chat.id: + return player + else: + return None diff --git a/player.py b/player.py index a8531ff..db24a09 100644 --- a/player.py +++ b/player.py @@ -58,7 +58,7 @@ class Player(object): self.waiting_time = 90 def leave(self): - """ Leave the current game """ + """Removes player from the game and closes the gap in the list""" if self.next is self: return @@ -100,8 +100,23 @@ class Player(object): else: self._next = player + def draw(self): + """Draws 1+ cards from the deck, depending on the draw counter""" + _amount = self.game.draw_counter or 1 + + for i in range(_amount): + self.cards.append(self.game.deck.draw()) + + self.game.draw_counter = 0 + self.drew = True + + def play(self, card): + """Plays a card and removes it from hand""" + self.cards.remove(card) + self.game.play_card(card) + def playable_cards(self): - """ Returns a list of the cards this player can play right now """ + """Returns a list of the cards this player can play right now""" playable = list() last = self.game.last_card @@ -115,7 +130,7 @@ class Player(object): # You may only play a +4 if you have no cards of the correct color self.bluffing = False for card in cards: - if self.card_playable(card, playable): + if self._card_playable(card): self.logger.debug("Matching!") playable.append(card) @@ -127,8 +142,8 @@ class Player(object): return playable - def card_playable(self, card, playable): - """ Check a single card if it can be played """ + def _card_playable(self, card): + """Check a single card if it can be played""" is_playable = True last = self.game.last_card @@ -149,9 +164,8 @@ class Player(object): (card.special == c.CHOOSE or card.special == c.DRAW_FOUR): self.logger.debug("Can't play colorchooser on another one") is_playable = False - elif not last.color or card in playable: - self.logger.debug("Last card has no color or the card was " - "already added to the list") + elif not last.color: + self.logger.debug("Last card has no color") is_playable = False return is_playable diff --git a/results.py b/results.py index 6da172d..674e0eb 100644 --- a/results.py +++ b/results.py @@ -17,6 +17,8 @@ # along with this program. If not, see . +"""Defines helper functions to build the inline result list""" + from uuid import uuid4 from telegram import InlineQueryResultArticle, InputTextMessageContent, \ @@ -27,6 +29,7 @@ from utils import * def add_choose_color(results): + """Add choose color options""" for color in c.COLORS: results.append( InlineQueryResultArticle( @@ -40,6 +43,7 @@ def add_choose_color(results): def add_other_cards(playable, player, results, game): + """Add hand cards when choosing colors""" if not playable: playable = list() @@ -61,13 +65,15 @@ def add_other_cards(playable, player, results, game): def player_list(game): + """Generate list of player strings""" players = list() for player in game.players: - add_player(player, players) + player.user.first_name + " (%d cards)" % len(player.cards) return players def add_no_game(results): + """Add text result if user is not playing""" results.append( InlineQueryResultArticle( "nogame", @@ -81,6 +87,7 @@ def add_no_game(results): def add_not_started(results): + """Add text result if the game has not yet started""" results.append( InlineQueryResultArticle( "nogame", @@ -92,6 +99,7 @@ def add_not_started(results): def add_draw(player, results): + """Add option to draw""" results.append( Sticker( "draw", sticker_file_id=c.STICKERS['option_draw'], @@ -103,6 +111,7 @@ def add_draw(player, results): def add_gameinfo(game, results): + """Add option to show game info""" players = player_list(game) results.append( @@ -119,6 +128,7 @@ def add_gameinfo(game, results): def add_pass(results): + """Add option to pass""" results.append( Sticker( "pass", sticker_file_id=c.STICKERS['option_pass'], @@ -128,6 +138,7 @@ def add_pass(results): def add_call_bluff(results): + """Add option to call a bluff""" results.append( Sticker( "call_bluff", @@ -138,7 +149,8 @@ def add_call_bluff(results): ) -def add_play_card(game, card, results, can_play): +def add_card(game, card, results, can_play): + """Add an option that represents a card""" players = player_list(game) if can_play: @@ -156,8 +168,3 @@ def add_play_card(game, card, results, can_play): "Players: " + " -> ".join(players))) ) - -def add_player(itplayer, players): - players.append(itplayer.user.first_name + " (%d cards)" - % len(itplayer.cards)) - diff --git a/test/test.py b/test/test.py deleted file mode 100644 index 3c18722..0000000 --- a/test/test.py +++ /dev/null @@ -1,41 +0,0 @@ -import unittest -from game import Game -from player import Player - - -class Test(unittest.TestCase): - - game = None - - def setUp(self): - self.game = Game() - - def test_insert(self): - p0 = Player(self.game, "Player 0") - p1 = Player(self.game, "Player 1") - p2 = Player(self.game, "Player 2") - - self.assertEqual(p0, p2.next) - self.assertEqual(p1, p0.next) - self.assertEqual(p2, p1.next) - - self.assertEqual(p0.prev, p2) - self.assertEqual(p1.prev, p0) - self.assertEqual(p2.prev, p1) - - def test_reverse(self): - p0 = Player(self.game, "Player 0") - p1 = Player(self.game, "Player 1") - p2 = Player(self.game, "Player 2") - self.game.reverse() - p3 = Player(self.game, "Player 3") - - self.assertEqual(p0, p3.next) - self.assertEqual(p1, p2.next) - self.assertEqual(p2, p0.next) - self.assertEqual(p3, p1.next) - - self.assertEqual(p0, p2.prev) - self.assertEqual(p1, p3.prev) - self.assertEqual(p2, p1.prev) - self.assertEqual(p3, p0.prev) diff --git a/test/test_game_manager.py b/test/test_game_manager.py new file mode 100644 index 0000000..e06882e --- /dev/null +++ b/test/test_game_manager.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python3 +# +# Telegram bot to play UNO in group chats +# Copyright (c) 2016 Jannes Höke +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + + +import unittest + +from telegram import User, Chat + +from game_manager import GameManager +from errors import AlreadyJoinedError, LobbyClosedError, NoGameInChatError, \ + NotEnoughPlayersError + + +class Test(unittest.TestCase): + + game = None + + def setUp(self): + self.gm = GameManager() + + self.chat0 = Chat(0, 'group') + self.chat1 = Chat(1, 'group') + self.chat2 = Chat(2, 'group') + + self.user0 = User(0, 'user0') + self.user1 = User(1, 'user1') + self.user2 = User(2, 'user2') + + def test_new_game(self): + g0 = self.gm.new_game(self.chat0) + g1 = self.gm.new_game(self.chat1) + + self.assertListEqual(self.gm.chatid_games[0], [g0]) + self.assertListEqual(self.gm.chatid_games[1], [g1]) + + def test_join_game(self): + + self.assertRaises(NoGameInChatError, + self.gm.join_game, + *(self.user0, self.chat0)) + + g0 = self.gm.new_game(self.chat0) + + self.gm.join_game(self.user0, self.chat0) + self.assertEqual(len(g0.players), 1) + + self.gm.join_game(self.user1, self.chat0) + self.assertEqual(len(g0.players), 2) + + g0.open = False + self.assertRaises(LobbyClosedError, + self.gm.join_game, + *(self.user2, self.chat0)) + + g0.open = True + self.assertRaises(AlreadyJoinedError, + self.gm.join_game, + *(self.user1, self.chat0)) diff --git a/test/test_player.py b/test/test_player.py new file mode 100644 index 0000000..e65a13f --- /dev/null +++ b/test/test_player.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python3 +# +# Telegram bot to play UNO in group chats +# Copyright (c) 2016 Jannes Höke +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + + +import unittest + +from game import Game +from player import Player +import card as c + + +class Test(unittest.TestCase): + + game = None + + def setUp(self): + self.game = Game(None) + + def test_insert(self): + p0 = Player(self.game, "Player 0") + p1 = Player(self.game, "Player 1") + p2 = Player(self.game, "Player 2") + + self.assertEqual(p0, p2.next) + self.assertEqual(p1, p0.next) + self.assertEqual(p2, p1.next) + + self.assertEqual(p0.prev, p2) + self.assertEqual(p1.prev, p0) + self.assertEqual(p2.prev, p1) + + def test_reverse(self): + p0 = Player(self.game, "Player 0") + p1 = Player(self.game, "Player 1") + p2 = Player(self.game, "Player 2") + self.game.reverse() + p3 = Player(self.game, "Player 3") + + self.assertEqual(p0, p3.next) + self.assertEqual(p1, p2.next) + self.assertEqual(p2, p0.next) + self.assertEqual(p3, p1.next) + + self.assertEqual(p0, p2.prev) + self.assertEqual(p1, p3.prev) + self.assertEqual(p2, p1.prev) + self.assertEqual(p3, p0.prev) + + def test_leave(self): + p0 = Player(self.game, "Player 0") + p1 = Player(self.game, "Player 1") + p2 = Player(self.game, "Player 2") + + p1.leave() + + self.assertEqual(p0, p2.next) + self.assertEqual(p2, p0.next) + + def test_draw(self): + p = Player(self.game, "Player 0") + + deck_before = len(self.game.deck.cards) + top_card = self.game.deck.cards[-1] + + p.draw() + + self.assertEqual(top_card, p.cards[-1]) + self.assertEqual(deck_before, len(self.game.deck.cards) + 1) + + def test_draw_two(self): + p = Player(self.game, "Player 0") + + deck_before = len(self.game.deck.cards) + self.game.draw_counter = 2 + + p.draw() + + self.assertEqual(deck_before, len(self.game.deck.cards) + 2) + + def test_playable_cards_simple(self): + p = Player(self.game, "Player 0") + + self.game.last_card = c.Card(c.RED, '5') + + p.cards = [c.Card(c.RED, '0'), c.Card(c.RED, '5'), c.Card(c.BLUE, '0'), + c.Card(c.GREEN, '5'), c.Card(c.GREEN, '8')] + + expected = [c.Card(c.RED, '0'), c.Card(c.RED, '5'), + c.Card(c.GREEN, '5')] + + self.assertListEqual(p.playable_cards(), expected) + + def test_playable_cards_on_draw_two(self): + p = Player(self.game, "Player 0") + + self.game.last_card = c.Card(c.RED, c.DRAW_TWO) + self.game.draw_counter = 2 + + p.cards = [c.Card(c.RED, c.DRAW_TWO), c.Card(c.RED, '5'), + c.Card(c.BLUE, '0'), c.Card(c.GREEN, '5'), + c.Card(c.GREEN, c.DRAW_TWO)] + + expected = [c.Card(c.RED, c.DRAW_TWO), c.Card(c.GREEN, c.DRAW_TWO)] + + self.assertListEqual(p.playable_cards(), expected) + + def test_playable_cards_on_draw_four(self): + p = Player(self.game, "Player 0") + + self.game.last_card = c.Card(c.RED, None, c.DRAW_FOUR) + self.game.draw_counter = 4 + + p.cards = [c.Card(c.RED, c.DRAW_TWO), c.Card(c.RED, '5'), + c.Card(c.BLUE, '0'), c.Card(c.GREEN, '5'), + c.Card(c.GREEN, c.DRAW_TWO), + c.Card(None, None, c.DRAW_FOUR), + c.Card(None, None, c.CHOOSE)] + + expected = list() + + self.assertListEqual(p.playable_cards(), expected) + + def test_bluffing(self): + p = Player(self.game, "Player 0") + + self.game.last_card = c.Card(c.RED, '1') + + p.cards = [c.Card(c.RED, c.DRAW_TWO), c.Card(c.RED, '5'), + c.Card(c.BLUE, '0'), c.Card(c.GREEN, '5'), + c.Card(c.RED, '5'), c.Card(c.GREEN, c.DRAW_TWO), + c.Card(None, None, c.DRAW_FOUR), + c.Card(None, None, c.CHOOSE)] + + p.playable_cards() + self.assertTrue(p.bluffing) + + p.cards = [c.Card(c.BLUE, '1'), c.Card(c.GREEN, '1'), + c.Card(c.GREEN, c.DRAW_TWO), + c.Card(None, None, c.DRAW_FOUR), + c.Card(None, None, c.CHOOSE)] + + p.playable_cards() + self.assertFalse(p.bluffing) diff --git a/user_setting.py b/user_setting.py new file mode 100644 index 0000000..54e2054 --- /dev/null +++ b/user_setting.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 +# +# Telegram bot to play UNO in group chats +# Copyright (c) 2016 Jannes Höke +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + + +from database import db, Optional, Required, PrimaryKey + + +class UserSetting(db.Entity): + + id = PrimaryKey(int, auto=False) # Telegram User ID + lang = Optional(str, default='en') # The language setting for this user + stats = Optional(bool, default=False) # Opt-in to keep game statistics + first_places = Optional(int, default=0) # Nr. of games won in first place + games_played = Optional(int, default=0) # Nr. of games completed + cards_played = Optional(int, default=0) # Nr. of cards played