sábado, 23 de agosto de 2014

Asyncio - Lendo o teclado

Continuando a série sobre o módulo asyncio do Python 3.4, vamos ver como criar um jogo simples, em modo texto. O objetivo do jogo é simplesmente mostrar um labirinto e deixar o jogador se mover utilizando o teclado numérico (4 - esquerda, 6 - direita, 8 - cima, 2 - baixo e S para sair). Para exercitar nossos músculos da época do DOS com programação assíncrona, vamos exibir o relógio na última linha. O resultado final deve se parecer com a imagem abaixo:


Você precisa instalar o colorconsole e o Python 3.4.1 para executar este programa.

from random import shuffle, randrange
from colorconsole import terminal
from concurrent.futures import ThreadPoolExecutor
import datetime
import asyncio

# Colorconsole: https://github.com/lskbr/colorconsole
# Make_make: http://rosettacode.org/wiki/Maze_generation#Python
def make_maze(w = 16, h = 8):
    vis = [[0] * w + [1] for _ in range(h)] + [[1] * (w + 1)]
    ver = [["|  "] * w + ['|'] for _ in range(h)] + [[]]
    hor = [["+--"] * w + ['+'] for _ in range(h + 1)]
 
    def walk(x, y):
        vis[y][x] = 1
 
        d = [(x - 1, y), (x, y + 1), (x + 1, y), (x, y - 1)]
        shuffle(d)
        for (xx, yy) in d:
            if vis[yy][xx]: continue
            if xx == x: hor[max(y, yy)][x] = "+  "
            if yy == y: ver[y][max(x, xx)] = "   "
            walk(xx, yy)
 
    walk(randrange(w), randrange(h))
    maze = []
    for (a, b) in zip(hor, ver):
        maze.append("".join(a))
        maze.append("".join(b))
    return maze
 
class Jogo:
    LARGURA = 24
    ALTURA = 11
    def __init__(self):        
        self.tela = terminal.get_terminal(conEmu=False)
        self.tela.enable_unbuffered_input_mode()
        self.labirinto_cores = (terminal.colors["RED"],terminal.colors["BLACK"])
        self.jogador_carac = (terminal.colors["WHITE"],terminal.colors["BLUE"],'*')
        self.labirinto = make_maze(Jogo.LARGURA,Jogo.ALTURA)
        self.loop = asyncio.get_event_loop()
        self.tpool = ThreadPoolExecutor(max_workers=2)
        while True:
            self.x = randrange(Jogo.LARGURA*3)
            self.y = randrange(Jogo.ALTURA*2)
            if self.pode_mover(self.x, self.y):
                break
        self.jogando = True

    def fim_do_jogo(self):
        self.jogando = False
        self.loop.stop()

    @asyncio.coroutine
    def le_teclado(self):
        while self.jogando:            
            key = yield from self.loop.run_in_executor(self.tpool, self.tela.getch)            
            if(key!=None):
                nx, ny = self.x, self.y
                if key == b'4':
                    if nx > 1:
                        nx-=1
                elif key == b'6':
                    if nx < Jogo.LARGURA*3-1:
                        nx+=1
                elif key == b'8':
                    if ny > 0:
                        ny -=1
                elif key == b'2':
                    if ny < Jogo.ALTURA*2:                        
                        ny +=1
                elif key == b"S":
                    self.fim_do_jogo()
                    break
            if self.pode_mover(nx,ny) and (nx, ny) != (self.x,self.y):
                self.x, self.y = nx, ny
                self.desenha()           

    def pode_mover(self, x,y):
        return self.labirinto[y][x]==' '

    def desenha(self):
        self.tela.set_color(*self.labirinto_cores)
        #self.tela.clear()
        self.tela.gotoXY(0,0)
        self.tela.set_title("Labirinto") 
        for linha in self.labirinto:
            print(linha)
        self.tela.gotoXY(self.x, self.y)
        self.tela.cprint(*self.jogador_carac)

    @asyncio.coroutine
    def relogio(self):
        while self.jogando:
            self.tela.print_at(10,23,datetime.datetime.now().strftime("%d/%m/%y %H:%M:%S"))
            yield from asyncio.sleep(1)

    def execute(self):
        self.tela.clear()
        self.desenha()
        try:
            asyncio.async(self.le_teclado())
            asyncio.async(self.relogio())
            self.loop.run_forever()
        except KeyboardInterrupt:
            print("exit")
        finally:
            self.tpool.shutdown()
            self.loop.close()
            self.tela.restore_buffered_mode()

jogo = Jogo()
jogo.execute()

Para adicionar um pouco de variação ao jogo, eu baixei uma função que gera labirintos deste site. O programa foi modificado para retornar uma lista de strings, que é utilizada para detectar as paredes do labirinto.

O loop de eventos é parecido com o dos artigos anteriores, mas desta vez um ThreadPool está sendo criado. Um thread extra é necessário, pois vamos bloquear a rotina até que uma tecla seja pressionada. Esta construção funciona com várias rotinas bloqueantes que não podem ser utilizadas com asyncio, pois interromperiam a execução de todas as outras corotinas.

    @asyncio.coroutine
    def le_teclado(self):
        while self.jogando:            
            key = yield from self.loop.run_in_executor(self.tpool, self.tela.getch)            
            if(key!=None):
                nx, ny = self.x, self.y
                if key == b'4':
                    if nx > 1:
                        nx-=1
                elif key == b'6':
                    if nx < Jogo.LARGURA*3-1:
                        nx+=1
                elif key == b'8':
                    if ny > 0:
                        ny -=1
                elif key == b'2':
                    if ny < Jogo.ALTURA*2:                        
                        ny +=1
                elif key == b"S":
                    self.fim_do_jogo()
                    break
            if self.pode_mover(nx,ny) and (nx, ny) != (self.x,self.y):
                self.x, self.y = nx, ny
                self.desenha()

Criamos le_teclado como uma corotina, mas como o módulo colorconsole não foi criado para trabalhar com corotinas, precisamos chamar self.tela.getch usando um Executor, no caso, nosso ThreadPool. Desta forma, nossa chamada bloqueante será executada em um thread do ThreadPool e nosso loop de eventos vai continuar a executar normalmente. Quando pressionarmos uma tecla, a função self.tela.getch vai retornar e a partir daí, trabalharemos com o resultado no yield from. O resto do método verifica se a tecla é de movimento ou de saída do jogo. No final, verificamos se o jogador se mexeu ou se a nova posição seria a de uma parede. Caso a posição seja alterada, redesenhamos a tela, para que a nova posição do jogador seja visível.

    @asyncio.coroutine
    def relogio(self):
        while self.jogando:
            self.tela.print_at(10,23,datetime.datetime.now().strftime("%d/%m/%y %H:%M:%S"))
            yield from asyncio.sleep(1)

A corotina relogio exibe a data e hora atuais a cada 1 segundo na tela. Esta execução ocorre mesmo se nada pressionarmos no teclado, confirmado a correta execução de nosso loop de eventos. Este programa pode ser executado para conter inimigos e movê-los em outra corotina, similar a do relógio.

Testado no Windows 8.1 com Python 3.4.1, colorconsole 0.7.1.

Nenhum comentário: