separate game logic from bot interface,

introduce exceptions instead of boolean returns,
remove repetitive code,
begin unit tests,
improve docstrings,
update to python-telegram-bot==4.1.1,
add ponyorm settings classes (unused)
This commit is contained in:
Jannes Höke 2016-05-19 20:52:50 +02:00
parent a6f2c07403
commit 6204868a18
14 changed files with 755 additions and 339 deletions

500
bot.py
View file

@ -32,8 +32,16 @@ from telegram.utils.botan import Botan
from game_manager import GameManager
from credentials import TOKEN, BOTAN_TOKEN
from start_bot import start_bot
from results import *
from utils import *
from results import (add_call_bluff, add_choose_color, add_draw, add_gameinfo,
add_no_game, add_not_started, add_other_cards, add_pass,
add_card)
from utils import display_name
import card as c
from errors import (NoGameInChatError, LobbyClosedError, AlreadyJoinedError,
NotEnoughPlayersError, DeckEmptyError)
from database import db_session
TIMEOUT = 2.5
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
@ -83,8 +91,9 @@ source_text = ("This bot is Free Software and licensed under the AGPL. "
@run_async
def send_async(bot, *args, **kwargs):
"""Send a message asynchronously"""
if 'timeout' not in kwargs:
kwargs['timeout'] = 2.5
kwargs['timeout'] = TIMEOUT
try:
bot.sendMessage(*args, **kwargs)
@ -94,8 +103,9 @@ def send_async(bot, *args, **kwargs):
@run_async
def answer_async(bot, *args, **kwargs):
"""Answer an inline query asynchronously"""
if 'timeout' not in kwargs:
kwargs['timeout'] = 2.5
kwargs['timeout'] = TIMEOUT
try:
bot.answerInlineQuery(*args, **kwargs)
@ -104,89 +114,97 @@ def answer_async(bot, *args, **kwargs):
def error(bot, update, error):
""" Simple error handler """
"""Simple error handler"""
logger.exception(error)
def new_game(bot, update):
""" Handler for the /new command """
"""Handler for the /new command"""
chat_id = update.message.chat_id
if update.message.chat.type == 'private':
help(bot, update)
else:
game = gm.new_game(update.message.chat)
game.owner = update.message.from_user
send_async(bot, chat_id,
text="Created a new game! Join the game with /join "
"and start the game with /start")
if botan:
botan.track(update.message, 'New games')
def join_game(bot, update):
""" Handler for the /join command """
chat_id = update.message.chat_id
"""Handler for the /join command"""
chat = update.message.chat
if update.message.chat.type == 'private':
help(bot, update)
else:
try:
game = gm.chatid_games[chat_id][-1]
if not game.open:
send_async(bot, chat_id, text="The lobby is closed")
return
except (KeyError, IndexError):
pass
return
joined = gm.join_game(chat_id, update.message.from_user)
if joined:
send_async(bot, chat_id,
text="Joined the game",
reply_to_message_id=update.message.message_id)
elif joined is None:
send_async(bot, chat_id,
text="No game is running at the moment. "
"Create a new game with /new",
reply_to_message_id=update.message.message_id)
else:
send_async(bot, chat_id,
text="You already joined the game. Start the game "
"with /start",
reply_to_message_id=update.message.message_id)
try:
gm.join_game(update.message.from_user, chat)
except LobbyClosedError:
send_async(bot, chat.id, text="The lobby is closed")
except NoGameInChatError:
send_async(bot, chat.id,
text="No game is running at the moment. "
"Create a new game with /new",
reply_to_message_id=update.message.message_id)
except AlreadyJoinedError:
send_async(bot, chat.id,
text="You already joined the game. Start the game "
"with /start",
reply_to_message_id=update.message.message_id)
else:
send_async(bot, chat.id,
text="Joined the game",
reply_to_message_id=update.message.message_id)
def leave_game(bot, update):
""" Handler for the /leave command """
chat_id = update.message.chat_id
"""Handler for the /leave command"""
chat = update.message.chat
user = update.message.from_user
players = gm.userid_players.get(user.id, list())
for player in players:
if player.game.chat.id == chat_id:
game = player.game
break
else:
send_async(bot, chat_id, text="You are not playing in a game in "
player = gm.player_for_user_in_chat(user, chat)
if player is None:
send_async(bot, chat.id, text="You are not playing in a game in "
"this group.",
reply_to_message_id=update.message.message_id)
return
game = player.game
user = update.message.from_user
if len(game.players) < 3:
gm.end_game(chat_id, user)
send_async(bot, chat_id, text="Game ended!")
try:
gm.leave_game(user, chat)
except NoGameInChatError:
send_async(bot, chat.id, text="You are not playing in a game in "
"this group.",
reply_to_message_id=update.message.message_id)
except NotEnoughPlayersError:
gm.end_game(chat, user)
send_async(bot, chat.id, text="Game ended!")
else:
if gm.leave_game(user, chat_id):
send_async(bot, chat_id,
text="Okay. Next Player: " +
display_name(game.current_player.user),
reply_to_message_id=update.message.message_id)
else:
send_async(bot, chat_id, text="You are not playing in a game in "
"this group.",
reply_to_message_id=update.message.message_id)
send_async(bot, chat.id,
text="Okay. Next Player: " +
display_name(game.current_player.user),
reply_to_message_id=update.message.message_id)
@run_async
def select_game(bot, update):
"""Handler for callback queries to select the current game"""
chat_id = int(update.callback_query.data)
user_id = update.callback_query.from_user.id
@ -196,8 +214,9 @@ def select_game(bot, update):
gm.userid_current[user_id] = player
break
else:
send_async(bot, update.callback_query.message.chat_id,
text="Game not found :(")
bot.sendMessage(update.callback_query.message.chat_id,
text="Game not found.",
timeout=TIMEOUT)
return
back = [[InlineKeyboardButton(text='Back to last group',
@ -206,7 +225,8 @@ def select_game(bot, update):
bot.answerCallbackQuery(update.callback_query.id,
text="Please switch to the group you selected!",
show_alert=False,
timeout=2.5)
timeout=TIMEOUT)
bot.editMessageText(chat_id=update.callback_query.message.chat_id,
message_id=update.callback_query.message.message_id,
text="Selected group: %s\n"
@ -215,87 +235,115 @@ def select_game(bot, update):
% gm.userid_current[user_id].game.chat.title,
reply_markup=InlineKeyboardMarkup(back),
parse_mode=ParseMode.HTML,
timeout=2.5)
timeout=TIMEOUT)
def status_update(bot, update):
""" Remove player from game if user leaves the group """
"""Remove player from game if user leaves the group"""
chat = update.message.chat
if update.message.left_chat_member:
try:
chat_id = update.message.chat_id
user = update.message.left_chat_member
except KeyError:
return
if gm.leave_game(user, chat_id):
send_async(bot, chat_id, text="Removing %s from the game"
try:
gm.leave_game(user, chat)
except NoGameInChatError:
pass
except NotEnoughPlayersError:
gm.end_game(chat, user)
send_async(bot, chat.id, text="Game ended!")
else:
send_async(bot, chat.id, text="Removing %s from the game"
% display_name(user))
def start_game(bot, update, args):
""" Handler for the /start command """
"""Handler for the /start command"""
if update.message.chat.type != 'private':
# Show the first card
chat_id = update.message.chat_id
chat = update.message.chat
try:
game = gm.chatid_games[chat_id][-1]
game = gm.chatid_games[chat.id][-1]
except (KeyError, IndexError):
send_async(bot, chat_id, text="There is no game running in this "
send_async(bot, chat.id, text="There is no game running in this "
"chat. Create a new one with /new")
return
if game.current_player is None or \
game.current_player is game.current_player.next:
send_async(bot, chat_id, text="At least two players must /join "
if game.started:
send_async(bot, chat.id, text="The game has already started")
elif len(game.players) < 2:
send_async(bot, chat.id, text="At least two players must /join "
"the game before you can start it")
elif game.started:
send_async(bot, chat_id, text="The game has already started")
else:
game.play_card(game.last_card)
game.started = True
bot.sendSticker(chat_id,
sticker=c.STICKERS[str(game.last_card)],
timeout=2.5)
send_async(bot, chat_id,
text="First player: %s\n"
"Use /close to stop people from joining the game."
% display_name(game.current_player.user))
@run_async
def send_first():
"""Send the first card and player"""
bot.sendSticker(chat.id,
sticker=c.STICKERS[str(game.last_card)],
timeout=TIMEOUT)
bot.sendMessage(chat.id,
text="First player: %s\n"
"Use /close to stop people from joining "
"the game."
% display_name(game.current_player.user),
timeout=TIMEOUT)
send_first()
elif len(args) and args[0] == 'select':
players = gm.userid_players[update.message.from_user.id]
groups = list()
for player in players:
groups.append([InlineKeyboardButton(text=player.game.chat.title,
callback_data=
str(player.game.chat.id))])
title = player.game.chat.title
if player is gm.userid_current[update.message.from_user.id]:
title = '- %s -' % player.game.chat.title
groups.append(
[InlineKeyboardButton(text=title,
callback_data=str(player.game.chat.id))]
)
send_async(bot, update.message.chat_id,
text='Please select the group you want to play in. ',
text='Please select the group you want to play in.',
reply_markup=InlineKeyboardMarkup(groups))
else:
help(bot, update)
def close_game(bot, update):
""" Handler for the /close command """
chat_id = update.message.chat_id
"""Handler for the /close command"""
chat = update.message.chat
user = update.message.from_user
games = gm.chatid_games.get(chat_id)
games = gm.chatid_games.get(chat.id)
if not games:
send_async(bot, chat_id, text="There is no running game")
send_async(bot, chat.id, text="There is no running game in this chat.")
return
game = games[-1]
if game.owner.id == user.id:
game.open = False
send_async(bot, chat_id, text="Closed the lobby. "
send_async(bot, chat.id, text="Closed the lobby. "
"No more players can join this game.")
return
else:
send_async(bot, chat_id,
send_async(bot, chat.id,
text="Only the game creator (%s) can do that"
% game.owner.first_name,
reply_to_message_id=update.message.message_id)
@ -303,118 +351,115 @@ def close_game(bot, update):
def open_game(bot, update):
""" Handler for the /open command """
chat_id = update.message.chat_id
"""Handler for the /open command"""
chat = update.message.chat
user = update.message.from_user
games = gm.chatid_games.get(chat_id)
games = gm.chatid_games.get(chat.id)
if not games:
send_async(bot, chat_id, text="There is no running game")
send_async(bot, chat.id, text="There is no running game in this chat.")
return
game = games[-1]
if game.owner.id == user.id:
game.open = True
send_async(bot, chat_id, text="Opened the lobby. "
send_async(bot, chat.id, text="Opened the lobby. "
"New players may /join the game.")
return
else:
send_async(bot, chat_id,
text="Only the game creator (%s) can do that"
send_async(bot, chat.id,
text="Only the game creator (%s) can do that."
% game.owner.first_name,
reply_to_message_id=update.message.message_id)
return
def skip_player(bot, update):
""" Handler for the /skip command """
chat_id = update.message.chat_id
"""Handler for the /skip command"""
chat = update.message.chat
user = update.message.from_user
games = gm.chatid_games.get(chat_id)
players = gm.userid_players.get(user.id)
if not games:
send_async(bot, chat_id, text="There is no running game")
player = gm.player_for_user_in_chat(user, chat)
if not player:
send_async(bot, chat.id, text="You are not playing in a game in this "
"chat.")
return
if not players:
send_async(bot, chat_id, text="You are not playing")
return
game = player.game
skipped_player = game.current_player
next_player = game.current_player.next
for game in games:
for player in players:
if player in game.players:
started = game.current_player.turn_started
now = datetime.now()
delta = (now - started).seconds
started = skipped_player.turn_started
now = datetime.now()
delta = (now - started).seconds
if delta < game.current_player.waiting_time:
send_async(bot, chat_id,
text="Please wait %d seconds"
% (game.current_player.waiting_time -
delta),
reply_to_message_id=
update.message.message_id)
return
if delta < skipped_player.waiting_time:
send_async(bot, chat.id,
text="Please wait %d seconds"
% (skipped_player.waiting_time - delta),
reply_to_message_id=update.message.message_id)
elif game.current_player.waiting_time > 0:
game.current_player.anti_cheat += 1
game.current_player.waiting_time -= 30
game.current_player.cards.append(game.deck.draw())
send_async(bot, chat_id,
text="Waiting time to skip this player has "
"been reduced to %d seconds.\n"
"Next player: %s"
% (game.current_player.waiting_time,
display_name(
game.current_player.next.user)))
game.turn()
return
elif skipped_player.waiting_time > 0:
skipped_player.anti_cheat += 1
skipped_player.waiting_time -= 30
try:
skipped_player.draw()
except DeckEmptyError:
pass
elif len(game.players) > 2:
send_async(bot, chat_id,
text="%s was skipped four times in a row "
"and has been removed from the game.\n"
"Next player: %s"
% (display_name(game.current_player.user),
display_name(
game.current_player.next.user)))
send_async(bot, chat.id,
text="Waiting time to skip this player has "
"been reduced to %d seconds.\n"
"Next player: %s"
% (skipped_player.waiting_time,
display_name(next_player.user)))
game.turn()
gm.leave_game(game.current_player.user, chat_id)
return
else:
send_async(bot, chat_id,
text="%s was skipped four times in a row "
"and has been removed from the game.\n"
"The game ended."
% display_name(game.current_player.user))
else:
try:
gm.leave_game(skipped_player.user, chat)
send_async(bot, chat.id,
text="%s was skipped four times in a row "
"and has been removed from the game.\n"
"Next player: %s"
% (display_name(skipped_player.user),
display_name(next_player.user)))
gm.end_game(chat_id, game.current_player.user)
return
except NotEnoughPlayersError:
send_async(bot, chat.id,
text="%s was skipped four times in a row "
"and has been removed from the game.\n"
"The game ended."
% display_name(skipped_player.user))
gm.end_game(chat.id, skipped_player.user)
def help(bot, update):
""" Handler for the /help command """
"""Handler for the /help command"""
send_async(bot, update.message.chat_id, text=help_text,
parse_mode=ParseMode.HTML, disable_web_page_preview=True)
def source(bot, update):
""" Handler for the /help command """
"""Handler for the /help command"""
send_async(bot, update.message.chat_id, text=source_text,
parse_mode=ParseMode.HTML, disable_web_page_preview=True)
def news(bot, update):
""" Handler for the /news command """
"""Handler for the /news command"""
send_async(bot, update.message.chat_id,
text="All news here: https://telegram.me/unobotupdates",
disable_web_page_preview=True)
def reply_to_query(bot, update):
""" Builds the result list for inline queries and answers to the client """
"""
Handler for inline queries.
Builds the result list for inline queries and answers to the client.
"""
results = list()
playable = list()
switch = None
@ -429,9 +474,11 @@ def reply_to_query(bot, update):
else:
if not game.started:
add_not_started(results)
elif user_id == game.current_player.user.id:
if game.choosing_color:
add_choose_color(results)
add_other_cards(playable, player, results, game)
else:
if not player.drew:
add_draw(player, results)
@ -443,19 +490,18 @@ def reply_to_query(bot, update):
add_call_bluff(results)
playable = player.playable_cards()
added_ids = list()
added_ids = list() # Duplicates are not allowed
for card in sorted(player.cards):
add_play_card(game, card, results,
can_play=(card in playable and
add_card(game, card, results,
can_play=(card in playable and
str(card) not in added_ids))
added_ids.append(str(card))
if False or game.choosing_color:
add_other_cards(playable, player, results, game)
elif user_id != game.current_player.user.id or not game.started:
for card in sorted(player.cards):
add_play_card(game, card, results, can_play=False)
add_card(game, card, results, can_play=False)
else:
add_gameinfo(game, results)
@ -470,13 +516,16 @@ def reply_to_query(bot, update):
def process_result(bot, update):
""" Check the players actions and act accordingly """
"""
Handler for chosen inline results.
Checks the players actions and acts accordingly.
"""
try:
user = update.chosen_inline_result.from_user
player = gm.userid_current[user.id]
game = player.game
result_id = update.chosen_inline_result.result_id
chat_id = game.chat.id
chat = game.chat
except KeyError:
return
@ -491,103 +540,130 @@ def process_result(bot, update):
elif len(result_id) == 36: # UUID result
return
elif int(anti_cheat) != last_anti_cheat:
send_async(bot, chat_id,
send_async(bot, chat.id,
text="Cheat attempt by %s" % display_name(player.user))
return
elif result_id == 'call_bluff':
reset_waiting_time(bot, chat_id, player)
do_call_bluff(bot, chat_id, game, player)
reset_waiting_time(bot, player)
do_call_bluff(bot, player)
elif result_id == 'draw':
reset_waiting_time(bot, chat_id, player)
do_draw(game, player)
reset_waiting_time(bot, player)
do_draw(player)
elif result_id == 'pass':
game.turn()
elif result_id in c.COLORS:
game.choose_color(result_id)
else:
reset_waiting_time(bot, chat_id, player)
do_play_card(bot, chat_id, game, player, result_id, user)
reset_waiting_time(bot, player)
do_play_card(bot, player, result_id)
if game in gm.chatid_games.get(chat_id, list()):
send_async(bot, chat_id, text="Next player: " +
if game in gm.chatid_games.get(chat.id, list()):
send_async(bot, chat.id, text="Next player: " +
display_name(game.current_player.user))
def reset_waiting_time(bot, chat_id, player):
def reset_waiting_time(bot, player):
"""Resets waiting time for a player and sends a notice to the group"""
chat = player.game.chat
if player.waiting_time < 90:
player.waiting_time = 90
send_async(bot, chat_id, text="Waiting time for %s has been reset to "
send_async(bot, chat.id, text="Waiting time for %s has been reset to "
"90 seconds" % display_name(player.user))
def do_play_card(bot, chat_id, game, player, result_id, user):
def do_play_card(bot, player, result_id):
"""Plays the selected card and sends an update to the group if needed"""
card = c.from_str(result_id)
game.play_card(card)
player.cards.remove(card)
player.play(card)
game = player.game
chat = game.chat
user = player.user
if game.choosing_color:
send_async(bot, chat_id, text="Please choose a color")
send_async(bot, chat.id, text="Please choose a color")
if len(player.cards) == 1:
send_async(bot, chat_id, text="UNO!")
send_async(bot, chat.id, text="UNO!")
if len(player.cards) == 0:
send_async(bot, chat_id, text="%s won!" % user.first_name)
if len(game.players) < 3:
send_async(bot, chat_id, text="Game ended!")
gm.end_game(chat_id, user)
else:
gm.leave_game(user, chat_id)
send_async(bot, chat.id, text="%s won!" % user.first_name)
try:
gm.leave_game(user, chat)
except NotEnoughPlayersError:
send_async(bot, chat.id, text="Game ended!")
gm.end_game(chat, user)
if botan:
botan.track(Message(randint(1, 1000000000), user, datetime.now(),
Chat(chat_id, 'group')),
Chat(chat.id, 'group')),
'Played cards')
def do_draw(game, player):
def do_draw(bot, player):
"""Does the drawing"""
game = player.game
draw_counter_before = game.draw_counter
for n in range(game.draw_counter or 1):
player.cards.append(game.deck.draw())
game.draw_counter = 0
player.drew = True
try:
player.draw()
except DeckEmptyError:
send_async(bot, player.game.chat.id,
text="There are no more cards in the deck.")
if (game.last_card.value == c.DRAW_TWO or
game.last_card.special == c.DRAW_FOUR) and \
draw_counter_before > 0:
game.turn()
def do_call_bluff(bot, chat_id, game, player):
def do_call_bluff(bot, player):
"""Handles the bluff calling"""
game = player.game
chat = game.chat
if player.prev.bluffing:
send_async(bot, chat_id, text="Bluff called! Giving %d cards to %s"
send_async(bot, chat.id, text="Bluff called! Giving %d cards to %s"
% (game.draw_counter,
player.prev.user.first_name))
for i in range(game.draw_counter):
player.prev.cards.append(game.deck.draw())
try:
player.prev.draw()
except DeckEmptyError:
send_async(bot, player.game.chat.id,
text="There are no more cards in the deck.")
else:
send_async(bot, chat_id, text="%s didn't bluff! Giving %d cards to %s"
game.draw_counter += 2
send_async(bot, chat.id, text="%s didn't bluff! Giving %d cards to %s"
% (player.prev.user.first_name,
game.draw_counter + 2,
game.draw_counter,
player.user.first_name))
for i in range(game.draw_counter + 2):
player.cards.append(game.deck.draw())
game.draw_counter = 0
try:
player.draw()
except DeckEmptyError:
send_async(bot, player.game.chat.id,
text="There are no more cards in the deck.")
game.turn()
# Add all handlers to the dispatcher and run the bot
dp.addHandler(InlineQueryHandler(reply_to_query))
dp.addHandler(ChosenInlineResultHandler(process_result))
dp.addHandler(CallbackQueryHandler(select_game))
dp.addHandler(CommandHandler('start', start_game, pass_args=True))
dp.addHandler(CommandHandler('new', new_game))
dp.addHandler(CommandHandler('join', join_game))
dp.addHandler(CommandHandler('leave', leave_game))
dp.addHandler(CommandHandler('open', open_game))
dp.addHandler(CommandHandler('close', close_game))
dp.addHandler(CommandHandler('skip', skip_player))
dp.addHandler(CommandHandler('help', help))
dp.addHandler(CommandHandler('source', source))
dp.addHandler(CommandHandler('news', news))
dp.addHandler(MessageHandler([Filters.status_update], status_update))
dp.addErrorHandler(error)
dp.add_handler(InlineQueryHandler(reply_to_query))
dp.add_handler(ChosenInlineResultHandler(process_result))
dp.add_handler(CallbackQueryHandler(select_game))
dp.add_handler(CommandHandler('start', start_game, pass_args=True))
dp.add_handler(CommandHandler('new', new_game))
dp.add_handler(CommandHandler('join', join_game))
dp.add_handler(CommandHandler('leave', leave_game))
dp.add_handler(CommandHandler('open', open_game))
dp.add_handler(CommandHandler('close', close_game))
dp.add_handler(CommandHandler('skip', skip_player))
dp.add_handler(CommandHandler('help', help))
dp.add_handler(CommandHandler('source', source))
dp.add_handler(CommandHandler('news', news))
dp.add_handler(MessageHandler([Filters.status_update], status_update))
dp.add_error_handler(error)
start_bot(u)
u.idle()

10
card.py
View file

@ -180,9 +180,7 @@ STICKERS_GREY = {
class Card(object):
"""
This class represents a card.
"""
"""This class represents an UNO card"""
def __init__(self, color, value, special=None):
self.color = color
@ -205,16 +203,16 @@ class Card(object):
return '%s%s' % (COLOR_ICONS[self.color], self.value.capitalize())
def __eq__(self, other):
""" Needed for sorting the cards """
"""Needed for sorting the cards"""
return str(self) == str(other)
def __lt__(self, other):
""" Needed for sorting the cards """
"""Needed for sorting the cards"""
return str(self) < str(other)
def from_str(string):
""" Decode a Card object from a string """
"""Decodes a Card object from a string"""
if string not in SPECIALS:
color, value = string.split('_')
return Card(color, value)

20
chat_setting.py Normal file
View file

@ -0,0 +1,20 @@
#!/usr/bin/env python3
#
# Telegram bot to play UNO in group chats
# Copyright (c) 2016 Jannes Höke <uno@jhoeke.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
pass

23
database.py Normal file
View file

@ -0,0 +1,23 @@
#!/usr/bin/env python3
#
# Telegram bot to play UNO in group chats
# Copyright (c) 2016 Jannes Höke <uno@jhoeke.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pony.orm import Database, db_session, Optional, Required, Set, PrimaryKey
# Database singleton
db = Database()

21
deck.py
View file

@ -18,9 +18,11 @@
from random import shuffle
import logging
import card as c
from card import Card
import logging
from errors import DeckEmptyError
class Deck(object):
@ -45,22 +47,25 @@ class Deck(object):
self.shuffle()
def shuffle(self):
""" Shuffle the deck """
"""Shuffles the deck"""
self.logger.debug("Shuffling Deck")
shuffle(self.cards)
def draw(self):
""" Draw a card from this deck """
"""Draws a card from this deck"""
try:
card = self.cards.pop()
self.logger.debug("Drawing card " + str(card))
return card
except IndexError:
while len(self.graveyard):
self.cards.append(self.graveyard.pop())
self.shuffle()
return self.draw()
if len(self.graveyard):
while len(self.graveyard):
self.cards.append(self.graveyard.pop())
self.shuffle()
return self.draw()
else:
raise DeckEmptyError()
def dismiss(self, card):
""" All played cards should be returned into the deck """
"""Returns a card to the deck"""
self.graveyard.append(card)

37
errors.py Normal file
View file

@ -0,0 +1,37 @@
#!/usr/bin/env python3
#
# Telegram bot to play UNO in group chats
# Copyright (c) 2016 Jannes Höke <uno@jhoeke.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
class NoGameInChatError(Exception):
pass
class AlreadyJoinedError(Exception):
pass
class LobbyClosedError(Exception):
pass
class NotEnoughPlayersError(Exception):
pass
class DeckEmptyError(Exception):
pass

15
game.py
View file

@ -48,6 +48,7 @@ class Game(object):
@property
def players(self):
"""Returns a list of all players in this game"""
players = list()
if not self.current_player:
return players
@ -61,18 +62,23 @@ class Game(object):
return players
def reverse(self):
""" Reverse the direction of play """
"""Reverses the direction of game"""
self.reversed = not self.reversed
def turn(self):
""" Mark the turn as over and change the current player """
"""Marks the turn as over and change the current player"""
self.logger.debug("Next Player")
self.current_player = self.current_player.next
self.current_player.drew = False
self.current_player.turn_started = datetime.now()
self.choosing_color = False
def play_card(self, card):
""" Play a card and trigger its effects """
"""
Plays a card and triggers its effects.
Should be called only from Player.play or on game start to play the
first card
"""
self.deck.dismiss(self.last_card)
self.last_card = card
@ -100,7 +106,6 @@ class Game(object):
self.choosing_color = True
def choose_color(self, color):
""" Carries out the color choosing and turns the game """
"""Carries out the color choosing and turns the game"""
self.last_card.color = color
self.turn()
self.choosing_color = False

View file

@ -21,6 +21,8 @@ import logging
from game import Game
from player import Player
from errors import (AlreadyJoinedError, LobbyClosedError, NoGameInChatError,
NotEnoughPlayersError)
class GameManager(object):
@ -38,7 +40,7 @@ class GameManager(object):
"""
chat_id = chat.id
self.logger.info("Creating new game with id " + str(chat_id))
self.logger.debug("Creating new game in chat " + str(chat_id))
game = Game(chat)
if chat_id not in self.chatid_games:
@ -47,13 +49,17 @@ class GameManager(object):
self.chatid_games[chat_id].append(game)
return game
def join_game(self, chat_id, user):
def join_game(self, user, chat):
""" Create a player from the Telegram user and add it to the game """
self.logger.info("Joining game with id " + str(chat_id))
self.logger.info("Joining game with id " + str(chat.id))
try:
game = self.chatid_games[chat_id][-1]
game = self.chatid_games[chat.id][-1]
except (KeyError, IndexError):
return None
raise NoGameInChatError()
if not game.open:
raise LobbyClosedError()
if user.id not in self.userid_players:
self.userid_players[user.id] = list()
@ -61,76 +67,81 @@ class GameManager(object):
players = self.userid_players[user.id]
# Don not re-add a player and remove the player from previous games in
# this chat
# this chat, if he is in one of them
for player in players:
if player in game.players:
return False
raise AlreadyJoinedError()
else:
self.leave_game(user, chat_id)
try:
self.leave_game(user, chat)
except NoGameInChatError:
pass
player = Player(game, user)
players.append(player)
self.userid_current[user.id] = player
return True
def leave_game(self, user, chat_id):
def leave_game(self, user, chat):
""" Remove a player from its current game """
try:
players = self.userid_players[user.id]
games = self.chatid_games[chat_id]
for player in players:
for game in games:
if player in game.players:
if player is game.current_player:
game.turn()
player = self.player_for_user_in_chat(user, chat)
players = self.userid_players.get(user.id, list())
player.leave()
players.remove(player)
if not player:
raise NoGameInChatError
# If this is the selected game, switch to another
if self.userid_current[user.id] is player:
if len(players):
self.userid_current[user.id] = players[0]
else:
del self.userid_current[user.id]
return True
game = player.game
if len(game.players) < 3:
raise NotEnoughPlayersError()
if player is game.current_player:
game.turn()
player.leave()
players.remove(player)
# If this is the selected game, switch to another
if self.userid_current.get(user.id, None) is player:
if players:
self.userid_current[user.id] = players[0]
else:
return False
del self.userid_current[user.id]
del self.userid_players[user.id]
except KeyError:
return False
def end_game(self, chat_id, user):
def end_game(self, chat, user):
"""
End a game
"""
self.logger.info("Game in chat " + str(chat_id) + " ended")
players = self.userid_players[user.id]
games = self.chatid_games[chat_id]
the_game = None
self.logger.info("Game in chat " + str(chat.id) + " ended")
# Find the correct game instance to end
for player in players:
for game in games:
if player in game.players:
the_game = game
break
if the_game:
break
else:
return
player = self.player_for_user_in_chat(user, chat)
for player in the_game.players:
if not player:
raise NoGameInChatError
game = player.game
# Clear game
for player_in_game in game.players:
this_users_players = self.userid_players[player.user.id]
this_users_players.remove(player)
if len(this_users_players) is 0:
this_users_players.remove(player_in_game)
if this_users_players:
self.userid_current[player.user.id] = this_users_players[0]
else:
del self.userid_players[player.user.id]
del self.userid_current[player.user.id]
else:
self.userid_current[player.user.id] = this_users_players[0]
self.chatid_games[chat_id].remove(the_game)
return
self.chatid_games[chat.id].remove(game)
def player_for_user_in_chat(self, user, chat):
players = self.userid_players.get(user.id, list())
for player in players:
if player.game.chat.id == chat.id:
return player
else:
return None

View file

@ -58,7 +58,7 @@ class Player(object):
self.waiting_time = 90
def leave(self):
""" Leave the current game """
"""Removes player from the game and closes the gap in the list"""
if self.next is self:
return
@ -100,8 +100,23 @@ class Player(object):
else:
self._next = player
def draw(self):
"""Draws 1+ cards from the deck, depending on the draw counter"""
_amount = self.game.draw_counter or 1
for i in range(_amount):
self.cards.append(self.game.deck.draw())
self.game.draw_counter = 0
self.drew = True
def play(self, card):
"""Plays a card and removes it from hand"""
self.cards.remove(card)
self.game.play_card(card)
def playable_cards(self):
""" Returns a list of the cards this player can play right now """
"""Returns a list of the cards this player can play right now"""
playable = list()
last = self.game.last_card
@ -115,7 +130,7 @@ class Player(object):
# You may only play a +4 if you have no cards of the correct color
self.bluffing = False
for card in cards:
if self.card_playable(card, playable):
if self._card_playable(card):
self.logger.debug("Matching!")
playable.append(card)
@ -127,8 +142,8 @@ class Player(object):
return playable
def card_playable(self, card, playable):
""" Check a single card if it can be played """
def _card_playable(self, card):
"""Check a single card if it can be played"""
is_playable = True
last = self.game.last_card
@ -149,9 +164,8 @@ class Player(object):
(card.special == c.CHOOSE or card.special == c.DRAW_FOUR):
self.logger.debug("Can't play colorchooser on another one")
is_playable = False
elif not last.color or card in playable:
self.logger.debug("Last card has no color or the card was "
"already added to the list")
elif not last.color:
self.logger.debug("Last card has no color")
is_playable = False
return is_playable

View file

@ -17,6 +17,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Defines helper functions to build the inline result list"""
from uuid import uuid4
from telegram import InlineQueryResultArticle, InputTextMessageContent, \
@ -27,6 +29,7 @@ from utils import *
def add_choose_color(results):
"""Add choose color options"""
for color in c.COLORS:
results.append(
InlineQueryResultArticle(
@ -40,6 +43,7 @@ def add_choose_color(results):
def add_other_cards(playable, player, results, game):
"""Add hand cards when choosing colors"""
if not playable:
playable = list()
@ -61,13 +65,15 @@ def add_other_cards(playable, player, results, game):
def player_list(game):
"""Generate list of player strings"""
players = list()
for player in game.players:
add_player(player, players)
player.user.first_name + " (%d cards)" % len(player.cards)
return players
def add_no_game(results):
"""Add text result if user is not playing"""
results.append(
InlineQueryResultArticle(
"nogame",
@ -81,6 +87,7 @@ def add_no_game(results):
def add_not_started(results):
"""Add text result if the game has not yet started"""
results.append(
InlineQueryResultArticle(
"nogame",
@ -92,6 +99,7 @@ def add_not_started(results):
def add_draw(player, results):
"""Add option to draw"""
results.append(
Sticker(
"draw", sticker_file_id=c.STICKERS['option_draw'],
@ -103,6 +111,7 @@ def add_draw(player, results):
def add_gameinfo(game, results):
"""Add option to show game info"""
players = player_list(game)
results.append(
@ -119,6 +128,7 @@ def add_gameinfo(game, results):
def add_pass(results):
"""Add option to pass"""
results.append(
Sticker(
"pass", sticker_file_id=c.STICKERS['option_pass'],
@ -128,6 +138,7 @@ def add_pass(results):
def add_call_bluff(results):
"""Add option to call a bluff"""
results.append(
Sticker(
"call_bluff",
@ -138,7 +149,8 @@ def add_call_bluff(results):
)
def add_play_card(game, card, results, can_play):
def add_card(game, card, results, can_play):
"""Add an option that represents a card"""
players = player_list(game)
if can_play:
@ -156,8 +168,3 @@ def add_play_card(game, card, results, can_play):
"Players: " + " -> ".join(players)))
)
def add_player(itplayer, players):
players.append(itplayer.user.first_name + " (%d cards)"
% len(itplayer.cards))

View file

@ -1,41 +0,0 @@
import unittest
from game import Game
from player import Player
class Test(unittest.TestCase):
game = None
def setUp(self):
self.game = Game()
def test_insert(self):
p0 = Player(self.game, "Player 0")
p1 = Player(self.game, "Player 1")
p2 = Player(self.game, "Player 2")
self.assertEqual(p0, p2.next)
self.assertEqual(p1, p0.next)
self.assertEqual(p2, p1.next)
self.assertEqual(p0.prev, p2)
self.assertEqual(p1.prev, p0)
self.assertEqual(p2.prev, p1)
def test_reverse(self):
p0 = Player(self.game, "Player 0")
p1 = Player(self.game, "Player 1")
p2 = Player(self.game, "Player 2")
self.game.reverse()
p3 = Player(self.game, "Player 3")
self.assertEqual(p0, p3.next)
self.assertEqual(p1, p2.next)
self.assertEqual(p2, p0.next)
self.assertEqual(p3, p1.next)
self.assertEqual(p0, p2.prev)
self.assertEqual(p1, p3.prev)
s