diff --git a/ISMCTS.py b/ISMCTS.py new file mode 100644 index 0000000..2f5c5dd --- /dev/null +++ b/ISMCTS.py @@ -0,0 +1,332 @@ +# This is a very simple Python 2.7 implementation of the Information Set Monte Carlo Tree Search algorithm. +# The function ISMCTS(rootstate, itermax, verbose = False) is towards the bottom of the code. +# It aims to have the clearest and simplest possible code, and for the sake of clarity, the code +# is orders of magnitude less efficient than it could be made, particularly by using a +# state.GetRandomMove() or state.DoRandomRollout() function. +# +# An example GameState classes for Knockout Whist is included to give some idea of how you +# can write your own GameState to use ISMCTS in your hidden information game. +# +# Written by Peter Cowling, Edward Powley, Daniel Whitehouse (University of York, UK) September 2012 - August 2013. +# +# Licence is granted to freely use and distribute for any sensible/legal purpose so long as this comment +# remains in any distributed code. +# +# For more information about Monte Carlo Tree Search check out our web site at www.mcts.ai +# Also read the article accompanying this code at ***URL HERE*** + +from math import * +import random, sys +from game import Game as UNOGame +from player import Player as UNOPlayer +from utils import list_subtract_unsorted +import card as c + + +class GameState: + """ A state of the game, i.e. the game board. These are the only functions which are + absolutely necessary to implement ISMCTS in any imperfect information game, + although they could be enhanced and made quicker, for example by using a + GetRandomMove() function to generate a random move during rollout. + By convention the players are numbered 1, 2, ..., self.numberOfPlayers. + """ + + def __init__(self): + pass + + def GetNextPlayer(self, p): + """ Return the player to the left of the specified player + """ + raise NotImplementedError() + + def Clone(self): + """ Create a deep clone of this game state. + """ + raise NotImplementedError() + + def CloneAndRandomize(self, observer): + """ Create a deep clone of this game state, randomizing any information not visible to the specified observer player. + """ + raise NotImplementedError() + + def DoMove(self, move): + """ Update a state by carrying out the given move. + Must update playerToMove. + """ + raise NotImplementedError() + + def GetMoves(self): + """ Get all possible moves from this state. + """ + raise NotImplementedError() + + def GetResult(self, player): + """ Get the game result from the viewpoint of player. + """ + raise NotImplementedError() + + def __repr__(self): + """ Don't need this - but good style. + """ + pass + + +class UNOState(GameState): + """ A state of the game UNO. + """ + + def __init__(self, game): + """ Initialise the game state. n is the number of players (from 2 to 7). + """ + self.game = game + + @property + def playerToMove(self): + return self.game.current_player + + @property + def numberOfPlayers(self): + return len(self.game.players) + + def CloneAndRandomize(self, observer): + """ Create a deep clone of this game state. + """ + game = UNOGame(None) + game.deck.cards.append(game.last_card) + game.draw_counter = self.game.draw_counter + + game.last_card = self.game.last_card + + game.deck.cards = list_subtract_unsorted(game.deck.cards, + self.game.deck.graveyard) + game.deck.graveyard = list(self.game.deck.graveyard) + + for player in self.game.players: + p = UNOPlayer(game, None) + if player is observer: + p.cards = list(player.cards) + else: + for i in range(len(player.cards)): + p.cards.append(game.deck.draw()) + + return UNOState(game) + + def DoMove(self, move): + """ Update a state by carrying out the given move. + Must update playerToMove. + """ + if move == 'draw': + for n in range(self.game.draw_counter or 1): + self.game.current_player.cards.append( + self.game.deck.draw() + ) + + self.game.draw_counter = 0 + self.game.turn() + else: + self.game.current_player.cards.remove(move) + + self.game.play_card(move) + if move.special: + self.game.turn() + self.game.choosing_color = False + + def GetMoves(self): + """ Get all possible moves from this state. + """ + if self.game.current_player.cards: + playable = self.game.current_player.playable_cards() + playable_converted = list() + for card in playable: + if not card.color: + for color in c.COLORS: + playable_converted.append( + c.Card(color, None, card.special) + ) + else: + playable_converted.append(card) + + # playable_converted.append('draw') + return playable_converted or ['draw'] + else: + return list() + + def GetResult(self, player): + """ Get the game result from the viewpoint of player. + """ + return 1 if not player.cards else 0 + + def __repr__(self): + """ Return a human-readable representation of the state + """ + return '\n'.join( + ['%s: %s' % (p.user, [str(c) for c in p.cards]) + for p in self.game.players] + ) + "\nDeck: %s" % str([str(crd) for crd in self.game.deck.cards]) \ + + "\nGrav: %s" % str([str(crd) for crd in self.game.deck.graveyard]) + + +class Node: + """ A node in the game tree. Note wins is always from the viewpoint of playerJustMoved. + """ + + def __init__(self, move=None, parent=None, playerJustMoved=None): + self.move = move # the move that got us to this node - "None" for the root node + self.parentNode = parent # "None" for the root node + self.childNodes = [] + self.wins = 0 + self.visits = 0 + self.avails = 1 + self.playerJustMoved = playerJustMoved # the only part of the state that the Node needs later + + def GetUntriedMoves(self, legalMoves): + """ Return the elements of legalMoves for which this node does not have children. + """ + + # Find all moves for which this node *does* have children + triedMoves = [child.move for child in self.childNodes] + + # Return all moves that are legal but have not been tried yet + return [move for move in legalMoves if move not in triedMoves] + + def UCBSelectChild(self, legalMoves, exploration=0.7): + """ Use the UCB1 formula to select a child node, filtered by the given list of legal moves. + exploration is a constant balancing between exploitation and exploration, with default value 0.7 (approximately sqrt(2) / 2) + """ + + # Filter the list of children by the list of legal moves + legalChildren = [child for child in self.childNodes if + child.move in legalMoves] + + # Get the child with the highest UCB score + s = max(legalChildren, key=lambda c: float(c.wins) / float( + c.visits) + exploration * sqrt(log(c.avails) / float(c.visits))) + + # Update availability counts -- it is easier to do this now than during backpropagation + for child in legalChildren: + child.avails += 1 + + # Return the child selected above + return s + + def AddChild(self, m, p): + """ Add a new child node for the move m. + Return the added child node + """ + n = Node(move=m, parent=self, playerJustMoved=p) + self.childNodes.append(n) + return n + + def Update(self, terminalState): + """ Update this node - increment the visit count by one, and increase the win count by the result of terminalState for self.playerJustMoved. + """ + self.visits += 1 + if self.playerJustMoved is not None: + self.wins += terminalState.GetResult(self.playerJustMoved) + + def __repr__(self): + return "[M:%s W/V/A: %4i/%4i/%4i]" % ( + self.move, self.wins, self.visits, self.avails) + + def TreeToString(self, indent): + """ Represent the tree as a string, for debugging purposes. + """ + s = self.IndentString(indent) + str(self) + for c in self.childNodes: + s += c.TreeToString(indent + 1) + return s + + def IndentString(self, indent): + s = "\n" + for i in range(1, indent + 1): + s += "| " + return s + + def ChildrenToString(self): + s = "" + for c in self.childNodes: + s += str(c) + "\n" + return s + + +def ISMCTS(rootstate, itermax, verbose=False): + """ Conduct an ISMCTS search for itermax iterations starting from rootstate. + Return the best move from the rootstate. + """ + + rootnode = Node() + + for i in range(itermax): + node = rootnode + + # Determinize + state = rootstate.CloneAndRandomize(rootstate.playerToMove) + + # Select + while state.GetMoves() != [] and node.GetUntriedMoves( + state.GetMoves()) == []: # node is fully expanded and non-terminal + node = node.UCBSelectChild(state.GetMoves()) + state.DoMove(node.move) + + # Expand + untriedMoves = node.GetUntriedMoves(state.GetMoves()) + if untriedMoves != []: # if we can expand (i.e. state/node is non-terminal) + m = random.choice(untriedMoves) + player = state.playerToMove + state.DoMove(m) + node = node.AddChild(m, player) # add child and descend tree + + # Simulate + while state.GetMoves() != []: # while state is non-terminal + state.DoMove(random.choice(state.GetMoves())) + + # Backpropagate + while node != None: # backpropagate from the expanded node and work back to the root node + node.Update(state) + node = node.parentNode + + # Output some information about the tree - can be omitted + if (verbose): + print(rootnode.TreeToString(0)) + else: + print(rootnode.ChildrenToString()) + + return max(rootnode.childNodes, key=lambda + c: c.visits).move # return the move that was most visited + + +def PlayGame(): + """ Play a sample game between two ISMCTS players. + *** This is only a demo and not used by the actual bot *** + """ + game = UNOGame(None) + me = UNOPlayer(game, "Player 1") + UNOPlayer(game, "Player 2") + UNOPlayer(game, "Player 3") + UNOPlayer(game, "Player 4") + UNOPlayer(game, "Player 5") + + state = UNOState(game) + + while (state.GetMoves() != []): + print(str(state)) + # Use different numbers of iterations (simulations, tree nodes) for different players + m = ISMCTS(rootstate=state, itermax=10, verbose=False) + # if state.playerToMove is me: + # m = ISMCTS(rootstate=state, itermax=1000, verbose=False) + # else: + # m = ISMCTS(rootstate=state, itermax=100, verbose=False) + print("Best Move: " + str(m) + "\n") + state.DoMove(m) + + someoneWon = False + for p in game.players: + if state.GetResult(p) > 0: + print("Player " + str(p) + " wins!") + someoneWon = True + if not someoneWon: + print("Nobody wins!") + + +if __name__ == "__main__": + PlayGame() diff --git a/bot.py b/bot.py index 1fffc37..bbd294a 100644 --- a/bot.py +++ b/bot.py @@ -34,20 +34,13 @@ from credentials import TOKEN, BOTAN_TOKEN from start_bot import start_bot from results import * from utils import * +from player import Player # for ai players logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - level=logging.DEBUG) + level=logging.WARNING) logger = logging.getLogger(__name__) -gm = GameManager() -u = Updater(token=TOKEN, workers=32) -dp = u.dispatcher - -botan = False -if BOTAN_TOKEN: - botan = Botan(BOTAN_TOKEN) - help_text = ("Follow these steps:\n\n" "1. Add this bot to a group\n" "2. In the group, start a new game with /new or join an already" @@ -80,6 +73,7 @@ source_text = ("This bot is Free Software and licensed under the AGPL. " "The code is available here: \n" "https://github.com/jh0ker/mau_mau_bot") +ai_iterations = 100 @run_async def send_async(bot, *args, **kwargs): @@ -92,6 +86,17 @@ def send_async(bot, *args, **kwargs): error(None, None, e) +@run_async +def sticker_async(bot, *args, **kwargs): + if 'timeout' not in kwargs: + kwargs['timeout'] = 2.5 + + try: + bot.sendSticker(*args, **kwargs) + except Exception as e: + error(None, None, e) + + @run_async def answer_async(bot, *args, **kwargs): if 'timeout' not in kwargs: @@ -154,6 +159,29 @@ def join_game(bot, update): reply_to_message_id=update.message.message_id) +def add_ai(bot, update): + """ Handler for the /add_ai command """ + chat_id = update.message.chat_id + if update.message.chat.type == 'private': + help(bot, update) + else: + try: + game = gm.chatid_games[chat_id][-1] + if not game.open: + send_async(bot, chat_id, text="The lobby is closed") + return + else: + Player(game, None, ai=True) + send_async(bot, chat_id, + text="Added computer player", + reply_to_message_id=update.message.message_id) + except (KeyError, IndexError): + send_async(bot, chat_id, + text="No game is running at the moment. " + "Create a new game with /new", + reply_to_message_id=update.message.message_id) + + def leave_game(bot, update): """ Handler for the /leave command """ chat_id = update.message.chat_id @@ -180,6 +208,7 @@ def leave_game(bot, update): text="Okay. Next Player: " + display_name(game.current_player.user), reply_to_message_id=update.message.message_id) + ai_turn(bot, game) else: send_async(bot, chat_id, text="You are not playing in a game in " "this group.", @@ -262,6 +291,7 @@ def start_game(bot, update, args): text="First player: %s\n" "Use /close to stop people from joining the game." % display_name(game.current_player.user)) + ai_turn(bot, game) elif len(args) and args[0] == 'select': players = gm.userid_players[update.message.from_user.id] @@ -370,6 +400,7 @@ def skip_player(bot, update): display_name( game.current_player.next.user))) game.turn() + ai_turn(bot, game) return elif len(game.players) > 2: @@ -382,6 +413,7 @@ def skip_player(bot, update): game.current_player.next.user))) gm.leave_game(game.current_player.user, chat_id) + ai_turn(bot, game) return else: send_async(bot, chat_id, @@ -511,6 +543,7 @@ def process_result(bot, update): if game in gm.chatid_games.get(chat_id, list()): send_async(bot, chat_id, text="Next player: " + display_name(game.current_player.user)) + ai_turn(bot, game) def reset_waiting_time(bot, chat_id, player): @@ -555,7 +588,10 @@ def do_draw(game, player): def do_call_bluff(bot, chat_id, game, player): - if player.prev.bluffing: + if player.prev.ai: + send_async(bot, chat_id, text="Computer doesn't know bluffing yet") + return + elif player.prev.bluffing: send_async(bot, chat_id, text="Bluff called! Giving %d cards to %s" % (game.draw_counter, player.prev.user.first_name)) @@ -570,24 +606,76 @@ def do_call_bluff(bot, chat_id, game, player): player.cards.append(game.deck.draw()) game.draw_counter = 0 game.turn() + ai_turn(bot, game) -# Add all handlers to the dispatcher and run the bot -dp.addHandler(InlineQueryHandler(reply_to_query)) -dp.addHandler(ChosenInlineResultHandler(process_result)) -dp.addHandler(CallbackQueryHandler(select_game)) -dp.addHandler(CommandHandler('start', start_game, pass_args=True)) -dp.addHandler(CommandHandler('new', new_game)) -dp.addHandler(CommandHandler('join', join_game)) -dp.addHandler(CommandHandler('leave', leave_game)) -dp.addHandler(CommandHandler('open', open_game)) -dp.addHandler(CommandHandler('close', close_game)) -dp.addHandler(CommandHandler('skip', skip_player)) -dp.addHandler(CommandHandler('help', help)) -dp.addHandler(CommandHandler('source', source)) -dp.addHandler(CommandHandler('news', news)) -dp.addHandler(MessageHandler([Filters.status_update], status_update)) -dp.addErrorHandler(error) +def ai_turn(bot, game): + player = game.current_player + while player.ai: + reply = '' -start_bot(u) -u.idle() + from ISMCTS import UNOState, ISMCTS + chat_id = game.chat.id + state = UNOState(game) + move = ISMCTS(state, itermax=ai_iterations, verbose=False) + if move == 'draw': + reply += 'Drawing\n' + else: + sticker_async(bot, chat_id, + sticker=c.STICKERS[str(move)]) + if move.special: + reply += "Choosing color: %s\n" % display_color(move.color) + + state.DoMove(move) + if len(player.cards) == 1: + reply += "UNO!\n" + if len(player.cards) == 0: + reply += "%s won!\n" % player.user.first_name + if len(game.players) < 3: + reply += "Game ended!" + gm.end_game(chat_id, player.next.user) + else: + player.leave() + + player = game.current_player + if game in gm.chatid_games.get(chat_id, list()): + reply += "Next player: " + display_name(player.user) + + send_async(bot, chat_id, text=reply) + + +def set_ai_iterations(bot, update, args): + global ai_iterations + ai_iterations = int(args[0]) + + +if __name__ == '__main__': + gm = GameManager() + u = Updater(token=TOKEN, workers=32) + dp = u.dispatcher + + botan = False + if BOTAN_TOKEN: + botan = Botan(BOTAN_TOKEN) + + # Add all handlers to the dispatcher and run the bot + dp.addHandler(InlineQueryHandler(reply_to_query)) + dp.addHandler(ChosenInlineResultHandler(process_result)) + dp.addHandler(CallbackQueryHandler(select_game)) + dp.addHandler(CommandHandler('start', start_game, pass_args=True)) + dp.addHandler(CommandHandler('new', new_game)) + dp.addHandler(CommandHandler('join', join_game)) + dp.addHandler(CommandHandler('add_ai', add_ai)) + dp.addHandler(CommandHandler('set_ai', set_ai_iterations, pass_args=True)) + dp.addHandler(CommandHandler('leave', leave_game)) + dp.addHandler(CommandHandler('open', open_game)) + dp.addHandler(CommandHandler('close', close_game)) + dp.addHandler(CommandHandler('skip', skip_player)) + dp.addHandler(CommandHandler('help', help)) + dp.addHandler(CommandHandler('source', source)) + dp.addHandler(CommandHandler('news', news)) + dp.addHandler(MessageHandler([Filters.status_update], status_update)) + dp.addErrorHandler(error) + + start_bot(u) + u.idle() diff --git a/card.py b/card.py index aa3b303..5b8d268 100644 --- a/card.py +++ b/card.py @@ -114,7 +114,15 @@ STICKERS = { 'y_skip': 'BQADBAADQwIAAl9XmQABO_AZKtxY6IMC', 'y_reverse': 'BQADBAADQQIAAl9XmQABZdQFahGG6UQC', 'draw_four': 'BQADBAAD9QEAAl9XmQABVlkSNfhn76cC', + 'draw_four_r': 'BQADBAAD9QEAAl9XmQABVlkSNfhn76cC', + 'draw_four_b': 'BQADBAAD9QEAAl9XmQABVlkSNfhn76cC', + 'draw_four_g': 'BQADBAAD9QEAAl9XmQABVlkSNfhn76cC', + 'draw_four_y': 'BQADBAAD9QEAAl9XmQABVlkSNfhn76cC', 'colorchooser': 'BQADBAAD8wEAAl9XmQABl9rUOPqx4E4C', + 'colorchooser_r': 'BQADBAAD8wEAAl9XmQABl9rUOPqx4E4C', + 'colorchooser_b': 'BQADBAAD8wEAAl9XmQABl9rUOPqx4E4C', + 'colorchooser_g': 'BQADBAAD8wEAAl9XmQABl9rUOPqx4E4C', + 'colorchooser_y': 'BQADBAAD8wEAAl9XmQABl9rUOPqx4E4C', 'option_draw': 'BQADBAADzAIAAl9XmQABTkPaOqA5HIMC', 'option_pass': 'BQADBAADzgIAAl9XmQABWSDq3RIg3c0C', 'option_bluff': 'BQADBAADygIAAl9XmQABJoLfB9ntI2UC', @@ -191,7 +199,10 @@ class Card(object): def __str__(self): if self.special: - return self.special + if self.color: + return '%s_%s' % (self.special, self.color) + else: + return self.special else: return '%s_%s' % (self.color, self.value) @@ -206,7 +217,14 @@ class Card(object): def __eq__(self, other): """ Needed for sorting the cards """ - return str(self) == str(other) + s1 = str(self) + s2 = str(other) + return (s1 == s2 + if not self.special else + s1 == s2 or + s1[:-2] == s2[:-2] or + s1[:-2] == s2 or + s1 == s2[:-2]) def __lt__(self, other): """ Needed for sorting the cards """ diff --git a/deck.py b/deck.py index 0d3b82c..7f9aa65 100644 --- a/deck.py +++ b/deck.py @@ -53,6 +53,8 @@ class Deck(object): """ Draw a card from this deck """ try: card = self.cards.pop() + if card.special: + card = Card(None, None, card.special) self.logger.debug("Drawing card " + str(card)) return card except IndexError: @@ -63,4 +65,6 @@ class Deck(object): def dismiss(self, card): """ All played cards should be returned into the deck """ + # if card.special: + # card.color = None self.graveyard.append(card) diff --git a/game.py b/game.py index 0e4e632..1141c13 100644 --- a/game.py +++ b/game.py @@ -40,7 +40,7 @@ class Game(object): self.last_card = self.deck.draw() while self.last_card.special: - self.deck.dismiss(self.last_card) + self.deck.cards.append(self.last_card) self.deck.shuffle() self.last_card = self.deck.draw() diff --git a/game_manager.py b/game_manager.py index 4e3ca4b..8fa9957 100644 --- a/game_manager.py +++ b/game_manager.py @@ -124,6 +124,8 @@ class GameManager(object): return for player in the_game.players: + if player.ai: + continue this_users_players = self.userid_players[player.user.id] this_users_players.remove(player) if len(this_users_players) is 0: diff --git a/player.py b/player.py index a8531ff..603798a 100644 --- a/player.py +++ b/player.py @@ -20,6 +20,8 @@ import logging from datetime import datetime +from telegram import User + import card as c @@ -31,7 +33,7 @@ class Player(object): other players by placing itself behind the current player. """ - def __init__(self, game, user): + def __init__(self, game, user, ai=False): self.cards = list() self.game = game self.user = user @@ -56,6 +58,10 @@ class Player(object): self.anti_cheat = 0 self.turn_started = datetime.now() self.waiting_time = 90 + self.ai = ai + + if ai and not user: + self.user = User(-1, "Computer") def leave(self): """ Leave the current game """ @@ -73,10 +79,10 @@ class Player(object): self.cards = list() def __repr__(self): - return repr(self.user) + return repr(self.user) if not self.ai else "computer" def __str__(self): - return str(self.user) + return str(self.user) if not self.ai else "Computer" @property def next(self): diff --git a/utils.py b/utils.py index 7d327a8..8d407e2 100644 --- a/utils.py +++ b/utils.py @@ -30,6 +30,22 @@ def list_subtract(list1, list2): return list(sorted(list1)) +def list_subtract_unsorted(list1, list2): + """ Helper function to subtract two lists and return the sorted result """ + list1 = list1.copy() + + for x in list2: + try: + list1.remove(x) + except ValueError: + print(list1) + print(list2) + print(x) + raise + + return list1 + + def display_name(user): """ Get the current players name including their username, if possible """ user_name = user.first_name