initial commit for AI opponent using ISMCTS

This commit is contained in:
Jannes Höke 2016-05-16 16:55:40 +02:00
parent a6f2c07403
commit 8343737cb4
8 changed files with 500 additions and 34 deletions

332
ISMCTS.py Normal file
View file

@ -0,0 +1,332 @@
# This is a very simple Python 2.7 implementation of the Information Set Monte Carlo Tree Search algorithm.
# The function ISMCTS(rootstate, itermax, verbose = False) is towards the bottom of the code.
# It aims to have the clearest and simplest possible code, and for the sake of clarity, the code
# is orders of magnitude less efficient than it could be made, particularly by using a
# state.GetRandomMove() or state.DoRandomRollout() function.
#
# An example GameState classes for Knockout Whist is included to give some idea of how you
# can write your own GameState to use ISMCTS in your hidden information game.
#
# Written by Peter Cowling, Edward Powley, Daniel Whitehouse (University of York, UK) September 2012 - August 2013.
#
# Licence is granted to freely use and distribute for any sensible/legal purpose so long as this comment
# remains in any distributed code.
#
# For more information about Monte Carlo Tree Search check out our web site at www.mcts.ai
# Also read the article accompanying this code at ***URL HERE***
from math import *
import random, sys
from game import Game as UNOGame
from player import Player as UNOPlayer
from utils import list_subtract_unsorted
import card as c
class GameState:
""" A state of the game, i.e. the game board. These are the only functions which are
absolutely necessary to implement ISMCTS in any imperfect information game,
although they could be enhanced and made quicker, for example by using a
GetRandomMove() function to generate a random move during rollout.
By convention the players are numbered 1, 2, ..., self.numberOfPlayers.
"""
def __init__(self):
pass
def GetNextPlayer(self, p):
""" Return the player to the left of the specified player
"""
raise NotImplementedError()
def Clone(self):
""" Create a deep clone of this game state.
"""
raise NotImplementedError()
def CloneAndRandomize(self, observer):
""" Create a deep clone of this game state, randomizing any information not visible to the specified observer player.
"""
raise NotImplementedError()
def DoMove(self, move):
""" Update a state by carrying out the given move.
Must update playerToMove.
"""
raise NotImplementedError()
def GetMoves(self):
""" Get all possible moves from this state.
"""
raise NotImplementedError()
def GetResult(self, player):
""" Get the game result from the viewpoint of player.
"""
raise NotImplementedError()
def __repr__(self):
""" Don't need this - but good style.
"""
pass
class UNOState(GameState):
""" A state of the game UNO.
"""
def __init__(self, game):
""" Initialise the game state. n is the number of players (from 2 to 7).
"""
self.game = game
@property
def playerToMove(self):
return self.game.current_player
@property
def numberOfPlayers(self):
return len(self.game.players)
def CloneAndRandomize(self, observer):
""" Create a deep clone of this game state.
"""
game = UNOGame(None)
game.deck.cards.append(game.last_card)
game.draw_counter = self.game.draw_counter
game.last_card = self.game.last_card
game.deck.cards = list_subtract_unsorted(game.deck.cards,
self.game.deck.graveyard)
game.deck.graveyard = list(self.game.deck.graveyard)
for player in self.game.players:
p = UNOPlayer(game, None)
if player is observer:
p.cards = list(player.cards)
else:
for i in range(len(player.cards)):
p.cards.append(game.deck.draw())
return UNOState(game)
def DoMove(self, move):
""" Update a state by carrying out the given move.
Must update playerToMove.
"""
if move == 'draw':
for n in range(self.game.draw_counter or 1):
self.game.current_player.cards.append(
self.game.deck.draw()
)
self.game.draw_counter = 0
self.game.turn()
else:
self.game.current_player.cards.remove(move)
self.game.play_card(move)
if move.special:
self.game.turn()
self.game.choosing_color = False
def GetMoves(self):
""" Get all possible moves from this state.
"""
if self.game.current_player.cards:
playable = self.game.current_player.playable_cards()
playable_converted = list()
for card in playable:
if not card.color:
for color in c.COLORS:
playable_converted.append(
c.Card(color, None, card.special)
)
else:
playable_converted.append(card)
# playable_converted.append('draw')
return playable_converted or ['draw']
else:
return list()
def GetResult(self, player):
""" Get the game result from the viewpoint of player.
"""
return 1 if not player.cards else 0
def __repr__(self):
""" Return a human-readable representation of the state
"""
return '\n'.join(
['%s: %s' % (p.user, [str(c) for c in p.cards])
for p in self.game.players]
) + "\nDeck: %s" % str([str(crd) for crd in self.game.deck.cards]) \
+ "\nGrav: %s" % str([str(crd) for crd in self.game.deck.graveyard])
class Node:
""" A node in the game tree. Note wins is always from the viewpoint of playerJustMoved.
"""
def __init__(self, move=None, parent=None, playerJustMoved=None):
self.move = move # the move that got us to this node - "None" for the root node
self.parentNode = parent # "None" for the root node
self.childNodes = []
self.wins = 0
self.visits = 0
self.avails = 1
self.playerJustMoved = playerJustMoved # the only part of the state that the Node needs later
def GetUntriedMoves(self, legalMoves):
""" Return the elements of legalMoves for which this node does not have children.
"""
# Find all moves for which this node *does* have children
triedMoves = [child.move for child in self.childNodes]
# Return all moves that are legal but have not been tried yet
return [move for move in legalMoves if move not in triedMoves]
def UCBSelectChild(self, legalMoves, exploration=0.7):
""" Use the UCB1 formula to select a child node, filtered by the given list of legal moves.
exploration is a constant balancing between exploitation and exploration, with default value 0.7 (approximately sqrt(2) / 2)
"""
# Filter the list of children by the list of legal moves
legalChildren = [child for child in self.childNodes if
child.move in legalMoves]
# Get the child with the highest UCB score
s = max(legalChildren, key=lambda c: float(c.wins) / float(
c.visits) + exploration * sqrt(log(c.avails) / float(c.visits)))
# Update availability counts -- it is easier to do this now than during backpropagation
for child in legalChildren:
child.avails += 1
# Return the child selected above
return s
def AddChild(self, m, p):
""" Add a new child node for the move m.
Return the added child node
"""
n = Node(move=m, parent=self, playerJustMoved=p)
self.childNodes.append(n)
return n
def Update(self, terminalState):
""" Update this node - increment the visit count by one, and increase the win count by the result of terminalState for self.playerJustMoved.
"""
self.visits += 1
if self.playerJustMoved is not None:
self.wins += terminalState.GetResult(self.playerJustMoved)
def __repr__(self):
return "[M:%s W/V/A: %4i/%4i/%4i]" % (
self.move, self.wins, self.visits, self.avails)
def TreeToString(self, indent):
""" Represent the tree as a string, for debugging purposes.
"""
s = self.IndentString(indent) + str(self)
for c in self.childNodes:
s += c.TreeToString(indent + 1)
return s
def IndentString(self, indent):
s = "\n"
for i in range(1, indent + 1):
s += "| "
return s
def ChildrenToString(self):
s = ""
for c in self.childNodes:
s += str(c) + "\n"
return s
def ISMCTS(rootstate, itermax, verbose=False):
""" Conduct an ISMCTS search for itermax iterations starting from rootstate.
Return the best move from the rootstate.
"""
rootnode = Node()
for i in range(itermax):
node = rootnode
# Determinize
state = rootstate.CloneAndRandomize(rootstate.playerToMove)
# Select
while state.GetMoves() != [] and node.GetUntriedMoves(
state.GetMoves()) == []: # node is fully expanded and non-terminal
node = node.UCBSelectChild(state.GetMoves())
state.DoMove(node.move)
# Expand
untriedMoves = node.GetUntriedMoves(state.GetMoves())
if untriedMoves != []: # if we can expand (i.e. state/node is non-terminal)
m = random.choice(untriedMoves)
player = state.playerToMove
state.DoMove(m)
node = node.AddChild(m, player) # add child and descend tree
# Simulate
while state.GetMoves() != []: # while state is non-terminal
state.DoMove(random.choice(state.GetMoves()))
# Backpropagate
while node != None: # backpropagate from the expanded node and work back to the root node
node.Update(state)
node = node.parentNode
# Output some information about the tree - can be omitted
if (verbose):
print(rootnode.TreeToString(0))
else:
print(rootnode.ChildrenToString())
return max(rootnode.childNodes, key=lambda
c: c.visits).move # return the move that was most visited
def PlayGame():
""" Play a sample game between two ISMCTS players.
*** This is only a demo and not used by the actual bot ***
"""
game = UNOGame(None)
me = UNOPlayer(game, "Player 1")
UNOPlayer(game, "Player 2")
UNOPlayer(game, "Player 3")
UNOPlayer(game, "Player 4")
UNOPlayer(game, "Player 5")
state = UNOState(game)
while (state.GetMoves() != []):
print(str(state))
# Use different numbers of iterations (simulations, tree nodes) for different players
m = ISMCTS(rootstate=state, itermax=10, verbose=False)
# if state.playerToMove is me:
# m = ISMCTS(rootstate=state, itermax=1000, verbose=False)
# else:
# m = ISMCTS(rootstate=state, itermax=100, verbose=False)
print("Best Move: " + str(m) + "\n")
state.DoMove(m)
someoneWon = False
for p in game.players:
if state.GetResult(p) > 0:
print("Player " + str(p) + " wins!")
someoneWon = True
if not someoneWon:
print("Nobody wins!")
if __name__ == "__main__":
PlayGame()

144
bot.py
View file

@ -34,20 +34,13 @@ from credentials import TOKEN, BOTAN_TOKEN
from start_bot import start_bot
from results import *
from utils import *
from player import Player # for ai players
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.DEBUG)
level=logging.WARNING)
logger = logging.getLogger(__name__)
gm = GameManager()
u = Updater(token=TOKEN, workers=32)
dp = u.dispatcher
botan = False
if BOTAN_TOKEN:
botan = Botan(BOTAN_TOKEN)
help_text = ("Follow these steps:\n\n"
"1. Add this bot to a group\n"
"2. In the group, start a new game with /new or join an already"
@ -80,6 +73,7 @@ source_text = ("This bot is Free Software and licensed under the AGPL. "
"The code is available here: \n"
"https://github.com/jh0ker/mau_mau_bot")
ai_iterations = 100
@run_async
def send_async(bot, *args, **kwargs):
@ -92,6 +86,17 @@ def send_async(bot, *args, **kwargs):
error(None, None, e)
@run_async
def sticker_async(bot, *args, **kwargs):
if 'timeout' not in kwargs:
kwargs['timeout'] = 2.5
try:
bot.sendSticker(*args, **kwargs)
except Exception as e:
error(None, None, e)
@run_async
def answer_async(bot, *args, **kwargs):
if 'timeout' not in kwargs:
@ -154,6 +159,29 @@ def join_game(bot, update):
reply_to_message_id=update.message.message_id)
def add_ai(bot, update):
""" Handler for the /add_ai command """
chat_id = update.message.chat_id
if update.message.chat.type == 'private':
help(bot, update)
else:
try:
game = gm.chatid_games[chat_id][-1]
if not game.open:
send_async(bot, chat_id, text="The lobby is closed")
return
else:
Player(game, None, ai=True)
send_async(bot, chat_id,
text="Added computer player",
reply_to_message_id=update.message.message_id)
except (KeyError, IndexError):
send_async(bot, chat_id,
text="No game is running at the moment. "
"Create a new game with /new",
reply_to_message_id=update.message.message_id)
def leave_game(bot, update):
""" Handler for the /leave command """
chat_id = update.message.chat_id
@ -180,6 +208,7 @@ def leave_game(bot, update):
text="Okay. Next Player: " +
display_name(game.current_player.user),
reply_to_message_id=update.message.message_id)
ai_turn(bot, game)
else:
send_async(bot, chat_id, text="You are not playing in a game in "
"this group.",
@ -262,6 +291,7 @@ def start_game(bot, update, args):
text="First player: %s\n"
"Use /close to stop people from joining the game."
% display_name(game.current_player.user))
ai_turn(bot, game)
elif len(args) and args[0] == 'select':
players = gm.userid_players[update.message.from_user.id]
@ -370,6 +400,7 @@ def skip_player(bot, update):
display_name(
game.current_player.next.user)))
game.turn()
ai_turn(bot, game)
return
elif len(game.players) > 2:
@ -382,6 +413,7 @@ def skip_player(bot, update):
game.current_player.next.user)))
gm.leave_game(game.current_player.user, chat_id)
ai_turn(bot, game)
return
else:
send_async(bot, chat_id,
@ -511,6 +543,7 @@ def process_result(bot, update):
if game in gm.chatid_games.get(chat_id, list()):
send_async(bot, chat_id, text="Next player: " +
display_name(game.current_player.user))
ai_turn(bot, game)
def reset_waiting_time(bot, chat_id, player):
@ -555,7 +588,10 @@ def do_draw(game, player):
def do_call_bluff(bot, chat_id, game, player):
if player.prev.bluffing:
if player.prev.ai:
send_async(bot, chat_id, text="Computer doesn't know bluffing yet")
return
elif player.prev.bluffing:
send_async(bot, chat_id, text="Bluff called! Giving %d cards to %s"
% (game.draw_counter,
player.prev.user.first_name))
@ -570,24 +606,76 @@ def do_call_bluff(bot, chat_id, game, player):
player.cards.append(game.deck.draw())
game.draw_counter = 0
game.turn()
ai_turn(bot, game)
# Add all handlers to the dispatcher and run the bot
dp.addHandler(InlineQueryHandler(reply_to_query))
dp.addHandler(ChosenInlineResultHandler(process_result))
dp.addHandler(CallbackQueryHandler(select_game))
dp.addHandler(CommandHandler('start', start_game, pass_args=True))
dp.addHandler(CommandHandler('new', new_game))
dp.addHandler(CommandHandler('join', join_game))
dp.addHandler(CommandHandler('leave', leave_game))
dp.addHandler(CommandHandler('open', open_game))
dp.addHandler(CommandHandler('close', close_game))
dp.addHandler(CommandHandler('skip', skip_player))
dp.addHandler(CommandHandler('help', help))
dp.addHandler(CommandHandler('source', source))
dp.addHandler(CommandHandler('news', news))
dp.addHandler(MessageHandler([Filters.status_update], status_update))
dp.addErrorHandler(error)
def ai_turn(bot, game):
player = game.current_player
while player.ai:
reply = ''
start_bot(u)
u.idle()
from ISMCTS import UNOState, ISMCTS
chat_id = game.chat.id
state = UNOState(game)
move = ISMCTS(state, itermax=ai_iterations, verbose=False)
if move == 'draw':
reply += 'Drawing\n'
else:
sticker_async(bot, chat_id,
sticker=c.STICKERS[str(move)])
if move.special:
reply += "Choosing color: %s\n" % display_color(move.color)
state.DoMove(move)
if len(player.cards) == 1:
reply += "UNO!\n"
if len(player.cards) == 0:
reply += "%s won!\n" % player.user.first_name
if len(game.players) < 3:
reply += "Game ended!"
gm.end_game(chat_id, player.next.user)
else:
player.leave()
player = game.current_player
if game in gm.chatid_games.get(chat_id, list()):
reply += "Next player: " + display_name(player.user)
send_async(bot, chat_id, text=reply)
def set_ai_iterations(bot, update, args):
global ai_iterations
ai_iterations = int(args[0])
if __name__ == '__main__':
gm = GameManager()
u = Updater(token=TOKEN, workers=32)
dp = u.dispatcher
botan = False
if BOTAN_TOKEN:
botan = Botan(BOTAN_TOKEN)
# Add all handlers to the dispatcher and run the bot
dp.addHandler(InlineQueryHandler(reply_to_query))
dp.addHandler(ChosenInlineResultHandler(process_result))
dp.addHandler(CallbackQueryHandler(select_game))
dp.addHandler(CommandHandler('start', start_game, pass_args=True))
dp.addHandler(CommandHandler('new', new_game))
dp.addHandler(CommandHandler('join', join_game))
dp.addHandler(CommandHandler('add_ai', add_ai))
dp.addHandler(CommandHandler('set_ai', set_ai_iterations, pass_args=True))
dp.addHandler(CommandHandler('leave', leave_game))
dp.addHandler(CommandHandler('open', open_game))
dp.addHandler(CommandHandler('close', close_game))
dp.addHandler(CommandHandler('skip', skip_player))
dp.addHandler(CommandHandler('help', help))
dp.addHandler(CommandHandler('source', source))
dp.addHandler(CommandHandler('news', news))
dp.addHandler(MessageHandler([Filters.status_update], status_update))
dp.addErrorHandler(error)
start_bot(u)
u.idle()

22
card.py
View file

@ -114,7 +114,15 @@ STICKERS = {
'y_skip': 'BQADBAADQwIAAl9XmQABO_AZKtxY6IMC',
'y_reverse': 'BQADBAADQQIAAl9XmQABZdQFahGG6UQC',
'draw_four': 'BQADBAAD9QEAAl9XmQABVlkSNfhn76cC',
'draw_four_r': 'BQADBAAD9QEAAl9XmQABVlkSNfhn76cC',
'draw_four_b': 'BQADBAAD9QEAAl9XmQABVlkSNfhn76cC',
'draw_four_g': 'BQADBAAD9QEAAl9XmQABVlkSNfhn76cC',
'draw_four_y': 'BQADBAAD9QEAAl9XmQABVlkSNfhn76cC',
'colorchooser': 'BQADBAAD8wEAAl9XmQABl9rUOPqx4E4C',
'colorchooser_r': 'BQADBAAD8wEAAl9XmQABl9rUOPqx4E4C',
'colorchooser_b': 'BQADBAAD8wEAAl9XmQABl9rUOPqx4E4C',
'colorchooser_g': 'BQADBAAD8wEAAl9XmQABl9rUOPqx4E4C',
'colorchooser_y': 'BQADBAAD8wEAAl9XmQABl9rUOPqx4E4C',
'option_draw': 'BQADBAADzAIAAl9XmQABTkPaOqA5HIMC',
'option_pass': 'BQADBAADzgIAAl9XmQABWSDq3RIg3c0C',
'option_bluff': 'BQADBAADygIAAl9XmQABJoLfB9ntI2UC',
@ -191,7 +199,10 @@ class Card(object):
def __str__(self):
if self.special:
return self.special
if self.color:
return '%s_%s' % (self.special, self.color)
else:
return self.special
else:
return '%s_%s' % (self.color, self.value)
@ -206,7 +217,14 @@ class Card(object):
def __eq__(self, other):
""" Needed for sorting the cards """
return str(self) == str(other)
s1 = str(self)
s2 = str(other)
return (s1 == s2
if not self.special else
s1 == s2 or
s1[:-2] == s2[:-2] or
s1[:-2] == s2 or
s1 == s2[:-2])
def __lt__(self, other):
""" Needed for sorting the cards """

View file

@ -53,6 +53,8 @@ class Deck(object):
""" Draw a card from this deck """
try:
card = self.cards.pop()
if card.special:
card = Card(None, None, card.special)
self.logger.debug("Drawing card " + str(card))
return card
except IndexError:
@ -63,4 +65,6 @@ class Deck(object):
def dismiss(self, card):
""" All played cards should be returned into the deck """
# if card.special:
# card.color = None
self.graveyard.append(card)

View file

@ -40,7 +40,7 @@ class Game(object):
self.last_card = self.deck.draw()
while self.last_card.special:
self.deck.dismiss(self.last_card)
self.deck.cards.append(self.last_card)
self.deck.shuffle()
self.last_card = self.deck.draw()

View file

@ -124,6 +124,8 @@ class GameManager(object):
return
for player in the_game.players:
if player.ai:
continue
this_users_players = self.userid_players[player.user.id]
this_users_players.remove(player)
if len(this_users_players) is 0:

View file

@ -20,6 +20,8 @@
import logging
from datetime import datetime
from telegram import User
import card as c
@ -31,7 +33,7 @@ class Player(object):
other players by placing itself behind the current player.
"""
def __init__(self, game, user):
def __init__(self, game, user, ai=False):
self.cards = list()
self.game = game
self.user = user
@ -56,6 +58,10 @@ class Player(object):
self.anti_cheat = 0
self.turn_started = datetime.now()
self.waiting_time = 90
self.ai = ai
if ai and not user:
self.user = User(-1, "Computer")
def leave(self):
""" Leave the current game """
@ -73,10 +79,10 @@ class Player(object):
self.cards = list()
def __repr__(self):
return repr(self.user)
return repr(self.user) if not self.ai else "computer"
def __str__(self):
return str(self.user)
return str(self.user) if not self.ai else "Computer"
@property
def next(self):

View file

@ -30,6 +30,22 @@ def list_subtract(list1, list2):
return list(sorted(list1))
def list_subtract_unsorted(list1, list2):
""" Helper function to subtract two lists and return the sorted result """
list1 = list1.copy()
for x in list2:
try:
list1.remove(x)
except ValueError:
print(list1)
print(list2)
print(x)
raise
return list1
def display_name(user):
""" Get the current players name including their username, if possible """
user_name = user.first_name