sábado, 30 de agosto de 2014

Servidor de Chat com websockets e asyncio

Já é hora de escrever sobre um projeto mais completo. Aproveito para mostrar o módulo websockets para Python 3.4 que funciona muito bem com asyncio. Para não ter problemas de interface, eu resolvi escrever o cliente do chat em JavaScript. O cliente de exemplo foi baixado daqui. Como quase sempre, os exemplos são muito simples e nos deixam com água na boca sobre o que poderíamos realmente fazer. Quem já tentou escrever um chat em JavaScript sabe que WebSockets são uma mão na roda.

Este artigo faz parte de uma série que escrevo sobre o asyncio do Python 3.4. Você pode ler os outros artigos clicando aqui: Python e asyncio, Asyncio e corotinas e o Lendo o teclado.

A ideia de usarmos WebSockets visa demonstrar a facilidade do módulo websockets que deve ser instalado no Python 3.4:

pip install websockets

Uma vez instalado o módulo, podemos criar um servidor, como o mostrado na documentação do módulo:

import asyncio
import websockets

@asyncio.coroutine
def hello(websocket, path):
    name = yield from websocket.recv()
    print("< {}".format(name))
    greeting = "Hello {}!".format(name)
    yield from websocket.send(greeting)
    print("> {}".format(greeting))

start_server = websockets.serve(hello, 'localhost', 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

Execute o servidor com:
py -3 server.py

Para testar este programa, precisaremos de uma página com nosso cliente de chat. Eu preparei uma página com todo o código html e javascript necessário. Baixe a página cliente.html aqui. Salve o arquivo e abra-o com seu browser preferido: Chrome, Firefox ou o IE (>10).

Como o servidor é muito simples, tudo que podemos fazer é enviar uma mensagem e recebê-la de volta, já que é um servidor do tipo Echo (eco). Veja também que o servidor de WebSockets é inicializado como nossos outros servidores com o módulo asyncio, mas que este não obedece uma interface com métodos definidos para conexão, recebimento de dados etc. O objeto retornado pelo módulo websockets é um objeto com o cliente já conectado. Vejamos primeiramente como criar o servidor:

start_server = websockets.serve(hello, 'localhost', 8765)

A linha acima cria nosso servidor, chamando a função hello sempre que um novo cliente for executado. O nome localhost se refere a nosso computador e 8765 é a porta que utilizaremos para receber as conexões. Vejamos a função hello. Quando uma nova conexão for recebida, a função hello será chamado com dois parâmetros: o primeiro é o cliente já conectado e pronto para ser utilizado; e o segundo é o path ou o caminho usado (veremos isso depois em outro artigo). Na realidade, a corotina hello é responsável pelo tempo de vida e gestão da conexão do cliente. Quando a corotina hello termina, o cliente é desconectado. Veja também que usamos o yield from para enviar e receber dados. O uso do yield from permite que escrevamos nosso código como se sua execução fosse sequencial, como já discutimos nos outros artigos.

Um detalhe importante a notar é que a interface do módulo websockets já entrega os dados no formato de mensagem (como definido pelo protocolo). Diferentemente de um socket TCP/IP comum que trabalha com streams, entregando bytes. Quando o método recv retorna, uma mensagem inteira foi recebida, pouca importa quantos read foram feitos para completar esta tarefa. Esta característica vai facilitar muito a implementação do servidor de chat, uma vez que não precisaremos inventar um delimitador de mensagem, nem separar as mensagens manualmente em nosso código.

Em relação a nosso servidor de chat, o servidor de exemplo é bem limitado. A maior limitação é não permitir a comunicação entre clientes. A ideia do servidor de chat é enviar mensagens a todos os clientes conectados. Desta forma, o servidor deve ser informado sobre e registrar todas as conexões e desconexões do sistema. Vamos manter a lista dos clientes conectados com uma lista. Nossos clientes serão controlados por uma classe Cliente que veremos mais tarde. Observe a implementação parcial da classe Servidor:

class Servidor:
    def __init__(self):
        self.conectados = []
    
    @property
    def nconectados(self):
        return len(self.conectados)
    
    @asyncio.coroutine
    def conecta(self, websocket, path):
        cliente = Cliente(self, websocket, path)
        if cliente not in self.conectados:
            self.conectados.append(cliente)
            print("Novo cliente conectado. Total: {0}".format(self.nconectados))
        yield from cliente.gerencia()

    def desconecta(self, cliente):
        if cliente in self.conectados:
            self.conectados.remove(cliente)
        print("Cliente {1} desconectado. Total: {0}".format(self.nconectados, cliente.nome))

Veja que apenas conecta é uma corotina. Na realidade, todo o trabalho é feito na classe Cliente, que disponibiliza o método gerencia como uma corotina. O importante agora é entender a manutenção da lista de conexões ativas.

Vejamos a classe Cliente (parcial):

class Cliente:    
    def __init__(self, servidor, websocket, path):
        self.cliente = websocket
        self.servidor = servidor
        self.nome = None        
    
    @property
    def conectado(self):
        return self.cliente.open

    @asyncio.coroutine
    def gerencia(self):
        try:
            yield from self.envia("Bem vindo ao servidor de chat escrito em Python 3.4 com asyncio e WebSockets. Identifique-se com /nome SeuNome")
            while True:
                mensagem = yield from self.recebe()
                if mensagem:
                    print("{0} < {1}".format(self.nome, mensagem))
                    yield from self.processa_comandos(mensagem)                                            
                else:
                    break
        except Exception:
            print("Erro")
            raise        
        finally:
            self.servidor.desconecta(self)

Como cada cliente tem seu próprio websocket e precisa se comunicar com o servidor, guardaremos estas referências como atributos. Já preparamos também a gestão de nomes, embora tenhamos inicializado o nome do Cliente com None. O método gerencia, que é uma corotina, envia uma mensagem de boas vindas ao cliente e como no exemplo anterior, utiliza yield from para realizar o envio no loop de eventos. Uma vez que a mensagem inicial é enviada, entramos em um loop infinito que espera uma mensagem do cliente. Quando a conexão é fechada ou acontece um erro, o valor de mensagem é igual a None, por isso, testamos o valor de mensagem para sair do loop infinito criado pelo while True. Da mesma forma que no primeiro servidor de exemplo, nosso cliente é desconectado quando a corotina gerencia termina. Aproveitamos o fim da corotina para informar ao servidor que este cliente está se desconectado.

Ao recebermos uma mensagem, iniciamos o processamento da mesma, utilizando o método corotina processa_comandos da classe Cliente:
    @asyncio.coroutine
    def processa_comandos(self, mensagem):        
        if mensagem.strip().startswith("/"):
            comandos=shlex.split(mensagem.strip()[1:])
            if len(comandos)==0:
                yield from self.envia("Comando inválido")
                return
            print(comandos)
            comando = comandos[0].lower()            
            if comando == "horas":
                yield from self.envia("Hora atual: " + time.strftime("%H:%M:%S"))
            elif comando == "data":
                yield from self.envia("Data atual: " + time.strftime("%d/%m/%y"))
            elif comando == "clientes":
                yield from self.envia("{0} clientes conectados".format(self.servidor.nconectados))
            elif comando == "nome":
                yield from self.altera_nome(comandos)
            elif comando == "apenas":
                yield from self.apenas_para(comandos)
            else:
                yield from self.envia("Comando desconhecido")
        else:
            if self.nome:
                yield from self.servidor.envia_a_todos(self, mensagem)
            else:
                yield from self.envia("Identifique-se para enviar mensagens. Use o comando /nome SeuNome")

Lembrando os bons velhos tempos do IRC, o método processa_comandos reconhece comandos iniciados pela barra /. Desta forma, caso um cliente envie para o servidor /horas, este retornará a hora atual do servidor. Implementamos também os comandos:


  • /data que envia a data atual;
  • /clientes que envia quantos clientes estão conectados ao servidor, 
  • /nome e /apenas que veremos mais adiante. 


Utilizamos o módulo shlex para simplificar o processamento dos comandos, uma vez que a função shlex.split permite processar uma linha de texto como uma linha de comandos do bash, reconhecendo valores entre aspas e retirando os espaços em branco entre os parâmetros. Caso o usuário envie uma mensagem que não se inicia por uma barra, esta mensagem será enviada a todos os outros usuários conectados.

Para melhorar nosso chat, utilizamos o comando /nome para configurar nosso nome. O servidor cuida para que apenas um usuário utilize cada nome, retornando uma mensagem de erro, caso o nome desejado já esteja em uso. Este comando é processado pelo método altera_nome da classe Cliente:

    @asyncio.coroutine
    def altera_nome(self, comandos):                
        if len(comandos)>1 and self.servidor.verifica_nome(comandos[1]):
            self.nome = comandos[1]
            yield from self.envia("Nome alterado com sucesso para {0}".format(self.nome))
        else:
            yield from self.envia("Nome em uso ou inválido. Escolha um outro.")

O método altera_nome simplesmente verifica se passamos um parâmetro depois do comando /nome, pois comandos é uma lista onde cada elemento é um parâmetro (mas o primeiro é o nome do comando em si). Usando o método verifica_nome do servidor, checamos se o nome é único e enviamos uma mensagem de confirmação ou de erro dependendo do resultado. O método verifica_nome da classe Servidor é apresentado abaixo:

    def verifica_nome(self, nome):
        for cliente in self.conectados:
            if cliente.nome and cliente.nome == nome:
                return False
        return True

A verificação percorre toda a lista com os clientes conectados e verifica se um nome igual já foi registrado. Caso não encontre o nome na lista dos clientes já conectados, retorna True, permitindo o registro do nome pelo cliente que o solicitou.

Um outro comando interessante é o  /apenas que permite enviarmos uma mensagem apenas para determinado cliente. Vejamos a implementação do método apenas_para no cliente:

    @asyncio.coroutine
    def apenas_para(self, comandos):
        if len(comandos)<3:
            yield from self.envia("Comando incorreto. /apenas Destinatário mensagem")
            return
        destinatario = comandos[1]
        mensagem = " ".join(comandos[2:])
        enviado = yield from self.servidor.envia_a_destinatario(self, mensagem, destinatario)
        if not enviado:
            yield from self.envia("Destinatário {0} não encontrado. Mensagem não enviada.".format(destinatario))

E do método que realiza o envio na classe Servidor:

    @asyncio.coroutine
    def envia_a_destinatario(self, origem, mensagem, destinatario):        
        for cliente in self.conectados:            
            if cliente.nome == destinatario and origem != cliente and cliente.conectado:
                print("Enviando de <{0}> para <{1}>: {2}".format(origem.nome, cliente.nome, mensagem))
                yield from cliente.envia("PRIVADO de {0} >> {1}".format(origem.nome, mensagem))
                return True
        return False

Um outro método importante é o que envia uma mensagem a todos os clientes conectados:

    @asyncio.coroutine
    def envia_a_todos(self, origem, mensagem):
        print("Enviando a todos")
        for cliente in self.conectados:            
            if origem != cliente and cliente.conectado:
                print("Enviando de <{0}> para <{1}>: {2}".format(origem.nome, cliente.nome, mensagem))
                yield from cliente.envia("{0} >> {1}".format(origem.nome, mensagem))

Veja que enviamos a mensagem a todos da lista, mas que tomamos o cuidado para não enviar a mensagem ao mesmo cliente que a enviou, pois esta seria impressa uma segunda vez e nosso cliente Javascript já fez este trabalho por nós. A listagem completa abaixo:

import asyncio
import websockets
import time
import shlex

class Servidor:
    def __init__(self):
        self.conectados = []
    
    @property
    def nconectados(self):
        return len(self.conectados)
    
    @asyncio.coroutine
    def conecta(self, websocket, path):
        cliente = Cliente(self, websocket, path)
        if cliente not in self.conectados:
            self.conectados.append(cliente)
            print("Novo cliente conectado. Total: {0}".format(self.nconectados))            
        yield from cliente.gerencia()

    def desconecta(self, cliente):
        if cliente in self.conectados:
            self.conectados.remove(cliente)
        print("Cliente {1} desconectado. Total: {0}".format(self.nconectados, cliente.nome))            

    @asyncio.coroutine
    def envia_a_todos(self, origem, mensagem):
        print("Enviando a todos")
        for cliente in self.conectados:            
            if origem != cliente and cliente.conectado:
                print("Enviando de <{0}> para <{1}>: {2}".format(origem.nome, cliente.nome, mensagem))
                yield from cliente.envia("{0} >> {1}".format(origem.nome, mensagem))

    @asyncio.coroutine
    def envia_a_destinatario(self, origem, mensagem, destinatario):        
        for cliente in self.conectados:            
            if cliente.nome == destinatario and origem != cliente and cliente.conectado:
                print("Enviando de <{0}> para <{1}>: {2}".format(origem.nome, cliente.nome, mensagem))
                yield from cliente.envia("PRIVADO de {0} >> {1}".format(origem.nome, mensagem))
                return True
        return False

    def verifica_nome(self, nome):
        for cliente in self.conectados:
            if cliente.nome and cliente.nome == nome:
                return False
        return True


class Cliente:    
    def __init__(self, servidor, websocket, path):
        self.cliente = websocket
        self.servidor = servidor
        self.nome = None        
    
    @property
    def conectado(self):
        return self.cliente.open

    @asyncio.coroutine
    def gerencia(self):
        try:
            yield from self.envia("Bem vindo ao servidor de chat escrito em Python 3.4 com asyncio e WebSockets. Identifique-se com /nome SeuNome")
            while True:
                mensagem = yield from self.recebe()
                if mensagem:
                    print("{0} < {1}".format(self.nome, mensagem))
                    yield from self.processa_comandos(mensagem)                                            
                else:
                    break
        except Exception:
            print("Erro")
            raise        
        finally:
            self.servidor.desconecta(self)

    @asyncio.coroutine
    def envia(self, mensagem):
        yield from self.cliente.send(mensagem)

    @asyncio.coroutine
    def recebe(self):
        mensagem = yield from self.cliente.recv()
        return mensagem

    @asyncio.coroutine
    def processa_comandos(self, mensagem):        
        if mensagem.strip().startswith("/"):
            comandos=shlex.split(mensagem.strip()[1:])
            if len(comandos)==0:
                yield from self.envia("Comando inválido")
                return
            print(comandos)
            comando = comandos[0].lower()            
            if comando == "horas":
                yield from self.envia("Hora atual: " + time.strftime("%H:%M:%S"))
            elif comando == "data":
                yield from self.envia("Data atual: " + time.strftime("%d/%m/%y"))
            elif comando == "clientes":
                yield from self.envia("{0} clientes conectados".format(self.servidor.nconectados))
            elif comando == "nome":
                yield from self.altera_nome(comandos)
            elif comando == "apenas":
                yield from self.apenas_para(comandos)
            else:
                yield from self.envia("Comando desconhecido")
        else:
            if self.nome:
                yield from self.servidor.envia_a_todos(self, mensagem)
            else:
                yield from self.envia("Identifique-se para enviar mensagens. Use o comando /nome SeuNome")

    @asyncio.coroutine
    def altera_nome(self, comandos):                
        if len(comandos)>1 and self.servidor.verifica_nome(comandos[1]):
            self.nome = comandos[1]
            yield from self.envia("Nome alterado com sucesso para {0}".format(self.nome))
        else:
            yield from self.envia("Nome em uso ou inválido. Escolha um outro.")

    @asyncio.coroutine
    def apenas_para(self, comandos):
        if len(comandos)<3:
            yield from self.envia("Comando incorreto. /apenas Destinatário mensagem")
            return
        destinatario = comandos[1]
        mensagem = " ".join(comandos[2:])
        enviado = yield from self.servidor.envia_a_destinatario(self, mensagem, destinatario)
        if not enviado:
            yield from self.envia("Destinatário {0} não encontrado. Mensagem não enviada.".format(destinatario))



servidor=Servidor()
loop=asyncio.get_event_loop()

start_server = websockets.serve(servidor.conecta, 'localhost', 8765)

try:
    loop.run_until_complete(start_server)
    loop.run_forever()
finally:
    start_server.close()

Você pode baixar o arquivo completo clicando aqui.

Para testar, abra o arquivo cliente.html que você já deve ter salvo em seu computador. Se você já ativou o servidor com

py -3 servidor2.py

nosso cliente já deve estar conectado. Caso contrário, recarregue a página no navegador para forçar a reconexão. Quando conectado, o cliente exibe uma barra azul no topo da página. Para simular vários clientes, abra várias vezes o arquivo cliente.html. Vejamos uma sessão simples com 3 clientes:

No primeiro cliente:
/nome Cliente1
/horas

No segundo cliente:
/nome Cliente2
/data

No terceiro cliente:
/nome Cliente3
/apenas Cliente1 Olá 1
/apenas Cliente2 Olá 2
Olá todos!

Não esqueça de copiar linha por linha nas respectivas janelas. Digite ENTER para enviar a mensagem, uma de cada vez. Divirta-se!


sábado, 23 de agosto de 2014

Asyncio - Lendo o teclado

Continuando a série sobre o módulo asyncio do Python 3.4, vamos ver como criar um jogo simples, em modo texto. O objetivo do jogo é simplesmente mostrar um labirinto e deixar o jogador se mover utilizando o teclado numérico (4 - esquerda, 6 - direita, 8 - cima, 2 - baixo e S para sair). Para exercitar nossos músculos da época do DOS com programação assíncrona, vamos exibir o relógio na última linha. O resultado final deve se parecer com a imagem abaixo:


Você precisa instalar o colorconsole e o Python 3.4.1 para executar este programa.

from random import shuffle, randrange
from colorconsole import terminal
from concurrent.futures import ThreadPoolExecutor
import datetime
import asyncio

# Colorconsole: https://github.com/lskbr/colorconsole
# Make_make: http://rosettacode.org/wiki/Maze_generation#Python
def make_maze(w = 16, h = 8):
    vis = [[0] * w + [1] for _ in range(h)] + [[1] * (w + 1)]
    ver = [["|  "] * w + ['|'] for _ in range(h)] + [[]]
    hor = [["+--"] * w + ['+'] for _ in range(h + 1)]
 
    def walk(x, y):
        vis[y][x] = 1
 
        d = [(x - 1, y), (x, y + 1), (x + 1, y), (x, y - 1)]
        shuffle(d)
        for (xx, yy) in d:
            if vis[yy][xx]: continue
            if xx == x: hor[max(y, yy)][x] = "+  "
            if yy == y: ver[y][max(x, xx)] = "   "
            walk(xx, yy)
 
    walk(randrange(w), randrange(h))
    maze = []
    for (a, b) in zip(hor, ver):
        maze.append("".join(a))
        maze.append("".join(b))
    return maze
 
class Jogo:
    LARGURA = 24
    ALTURA = 11
    def __init__(self):        
        self.tela = terminal.get_terminal(conEmu=False)
        self.tela.enable_unbuffered_input_mode()
        self.labirinto_cores = (terminal.colors["RED"],terminal.colors["BLACK"])
        self.jogador_carac = (terminal.colors["WHITE"],terminal.colors["BLUE"],'*')
        self.labirinto = make_maze(Jogo.LARGURA,Jogo.ALTURA)
        self.loop = asyncio.get_event_loop()
        self.tpool = ThreadPoolExecutor(max_workers=2)
        while True:
            self.x = randrange(Jogo.LARGURA*3)
            self.y = randrange(Jogo.ALTURA*2)
            if self.pode_mover(self.x, self.y):
                break
        self.jogando = True

    def fim_do_jogo(self):
        self.jogando = False
        self.loop.stop()

    @asyncio.coroutine
    def le_teclado(self):
        while self.jogando:            
            key = yield from self.loop.run_in_executor(self.tpool, self.tela.getch)            
            if(key!=None):
                nx, ny = self.x, self.y
                if key == b'4':
                    if nx > 1:
                        nx-=1
                elif key == b'6':
                    if nx < Jogo.LARGURA*3-1:
                        nx+=1
                elif key == b'8':
                    if ny > 0:
                        ny -=1
                elif key == b'2':
                    if ny < Jogo.ALTURA*2:                        
                        ny +=1
                elif key == b"S":
                    self.fim_do_jogo()
                    break
            if self.pode_mover(nx,ny) and (nx, ny) != (self.x,self.y):
                self.x, self.y = nx, ny
                self.desenha()           

    def pode_mover(self, x,y):
        return self.labirinto[y][x]==' '

    def desenha(self):
        self.tela.set_color(*self.labirinto_cores)
        #self.tela.clear()
        self.tela.gotoXY(0,0)
        self.tela.set_title("Labirinto") 
        for linha in self.labirinto:
            print(linha)
        self.tela.gotoXY(self.x, self.y)
        self.tela.cprint(*self.jogador_carac)

    @asyncio.coroutine
    def relogio(self):
        while self.jogando:
            self.tela.print_at(10,23,datetime.datetime.now().strftime("%d/%m/%y %H:%M:%S"))
            yield from asyncio.sleep(1)

    def execute(self):
        self.tela.clear()
        self.desenha()
        try:
            asyncio.async(self.le_teclado())
            asyncio.async(self.relogio())
            self.loop.run_forever()
        except KeyboardInterrupt:
            print("exit")
        finally:
            self.tpool.shutdown()
            self.loop.close()
            self.tela.restore_buffered_mode()

jogo = Jogo()
jogo.execute()

Para adicionar um pouco de variação ao jogo, eu baixei uma função que gera labirintos deste site. O programa foi modificado para retornar uma lista de strings, que é utilizada para detectar as paredes do labirinto.

O loop de eventos é parecido com o dos artigos anteriores, mas desta vez um ThreadPool está sendo criado. Um thread extra é necessário, pois vamos bloquear a rotina até que uma tecla seja pressionada. Esta construção funciona com várias rotinas bloqueantes que não podem ser utilizadas com asyncio, pois interromperiam a execução de todas as outras corotinas.

    @asyncio.coroutine
    def le_teclado(self):
        while self.jogando:            
            key = yield from self.loop.run_in_executor(self.tpool, self.tela.getch)            
            if(key!=None):
                nx, ny = self.x, self.y
                if key == b'4':
                    if nx > 1:
                        nx-=1
                elif key == b'6':
                    if nx < Jogo.LARGURA*3-1:
                        nx+=1
                elif key == b'8':
                    if ny > 0:
                        ny -=1
                elif key == b'2':
                    if ny < Jogo.ALTURA*2:                        
                        ny +=1
                elif key == b"S":
                    self.fim_do_jogo()
                    break
            if self.pode_mover(nx,ny) and (nx, ny) != (self.x,self.y):
                self.x, self.y = nx, ny
                self.desenha()

Criamos le_teclado como uma corotina, mas como o módulo colorconsole não foi criado para trabalhar com corotinas, precisamos chamar self.tela.getch usando um Executor, no caso, nosso ThreadPool. Desta forma, nossa chamada bloqueante será executada em um thread do ThreadPool e nosso loop de eventos vai continuar a executar normalmente. Quando pressionarmos uma tecla, a função self.tela.getch vai retornar e a partir daí, trabalharemos com o resultado no yield from. O resto do método verifica se a tecla é de movimento ou de saída do jogo. No final, verificamos se o jogador se mexeu ou se a nova posição seria a de uma parede. Caso a posição seja alterada, redesenhamos a tela, para que a nova posição do jogador seja visível.

    @asyncio.coroutine
    def relogio(self):
        while self.jogando:
            self.tela.print_at(10,23,datetime.datetime.now().strftime("%d/%m/%y %H:%M:%S"))
            yield from asyncio.sleep(1)

A corotina relogio exibe a data e hora atuais a cada 1 segundo na tela. Esta execução ocorre mesmo se nada pressionarmos no teclado, confirmado a correta execução de nosso loop de eventos. Este programa pode ser executado para conter inimigos e movê-los em outra corotina, similar a do relógio.

Testado no Windows 8.1 com Python 3.4.1, colorconsole 0.7.1.

domingo, 10 de agosto de 2014

Asyncio e corotinas

Continuando a série sobre o módulo asyncio do Python 3.4, vou apresentar as corotinas e como elas simplificam a escrita de nossos programas com o loop de eventos. Com a saída do Python 3.4, eu atualizei o livro de Introdução à Programação com Python. Alguns assuntos fogem ao escopo do livro que é destinado a iniciantes. Eu estou continuando uma série de posts curtos sobre alguns tópicos que acho interessantes e quem sabe até podem virar base para um novo livro. Clique aqui para ler o primeiro artigo.

No artigo anterior, apresentamos uma chamada ao loop de eventos bem simples:
import asyncio

def print_and_repeat(loop):
    print('Hello World')
    loop.call_later(2, print_and_repeat, loop)

loop = asyncio.get_event_loop()
loop.call_soon(print_and_repeat, loop)
loop.run_forever()

Vejamos como reescrever este exemplo simples usando corotinas:
import asyncio

@asyncio.coroutine
def print_and_repeat(loop):
    while True:
        print('Hello World')
        yield from asyncio.sleep(2)

loop = asyncio.get_event_loop()
try:
 loop.run_until_complete(print_and_repeat(loop))
finally:
 loop.close()

Este exemplo foi extraído da documentação do Python, vamos ver o que mudou. A principal mudança no início do programa é o uso do decorador @asyncio.coroutine. Este decorador transforma nossa função em uma corotina e permite a utilização do yield from, como definido na PEP380. Veja que o cabeçalho da função não foi alterado, mas que substituímos a chamada de loop.call_later pela combinação de um while True com um yield from no final. Uma corotina pode ser suspensa e esperar o processamento de uma outra corotina. No caso, asyncio.sleep(2) é uma corotina do módulo asyncio que suspende a execução da função pelo número de segundos passados como parâmetro. Na realidade, esta chamada retorna uma corotina que é marcada como completa após os 2 segundos. Isto pode ser realizado pois no yield from, criamos a nova corotina e indicamos ao loop que não continue a executar print_and_repeat até que a nova corotina esteja concluída. A partir deste ponto, a execução volta ao loop de eventos que monitora a conclusão da nova corotina criada, suspendendo a execução da anterior. Uma vez que o a corotina do sleep é concluída, após 2 segundos, o loop reativa a chamada suspensa de print_and_repeat e a execução continua, voltando para o while. Parece complicado, mas veja como ficou fácil de escrevermos a função. Fica bem mais claro nossa intenção de realizar uma repetição do print('Hello World') a cada 2 segundos.

Modificamos também a chamada de execução da corotina, pois agora utilizamos loop.run_until_complete para iniciar nossa corotina principal. Aproveitamos para colocar tudo entre um try...finally para terminar a execução do loop corretamente (mesmo em caso de exceção). Perceba que no exemplo anterior, com call_soon, passamos a função e seus parâmetros, mas não executamos a função em si. No caso de run_until_complete, estamos passando o retorno da chamada de print_and_repeat que é uma corotina, uma vez que a marcamos com o decorador @asyncio.coroutine.

No post anterior comparamos a velocidade de execução entre as várias formas de se executar código em paralelo com Python. Agora veremos como usar o asyncio para criar uma aplicação prática, como um cliente e um servidor TCP/IP, mas indo além dos exemplos da documentação do Python. É preciso lembrar que o módulo asyncio ainda é muito novo e que tanto a documentação quanto a implementação de algumas funcionalidades ainda estão sendo alteradas.

Vamos começar pelo servidor. Um servidor TCP/IP é um exemplo clássico de programa chato a escrever. Normalmente, você pode escolher utilizar threads ou se aventurar com select e chamadas não bloqueantes para gerenciar várias conexões. Este problema se agrava em aplicações mais complexas, onde algum processamento precisa ser realizado antes de se gerar a resposta, por exemplo, a um comando do usuário. Usando o módulo asyncio, esta tarefa fica bem mais fácil. Primeiro, porque o tratamento de dados é gerenciado por uma classe, responsável pelo protocolo. Esta classe traz métodos que são chamados em situações comuns ao programarmos um servidor TCP/IP, como chegada de uma nova conexão, desconexão, chegada de dados para leitura entre outras. Além disso, o asyncio também traz classes especializadas em quebrar os dados em linhas, o que facilita a implementação de protocolos com comandos em formato texto, terminados por enter (CR).

O servidor é controlado por uma classe chamada EchoServer, pois o desenvolvi a partir do servidor de Echo dado como exemplo na documentação, mas com alguns detalhes que observei no código do módulo asyncio. O protocolo implementado é bem simples, a cada linha, a data e hora atuais são enviadas. Se o cliente enviar sair a conexão é terminada. Vamos ver o programa completo e discutir parte por parte.

import asyncio
import time
from common import *

class EchoServer(asyncio.streams.FlowControlMixin):    
    ativas = 0
    def connection_made(self, transporte):        
        peername = transporte.get_extra_info('peername')
        print('Conexão de {}'.format(peername))
        EchoServer.ativas+=1
        print("Conexões ativas: {}".format(EchoServer.ativas))
        self.transporte = transporte
        self.leitor = asyncio.StreamReader()
        self.leitor.set_transport(self.transporte)
        self.escritor = asyncio.StreamWriter(transport=self.transporte, protocol=self, reader=self.leitor, loop=asyncio.get_event_loop())        
        asyncio.async(self.gerencia())
    
    @asyncio.coroutine
    def gerencia(self):        
        while True:
            dados = yield from self.leitor.readline()                        
            self.escritor.write(strToByte(time.strftime("%c")+"\r\n"))            
            yield from self.escritor.drain()
            comando = byteToStr(dados).strip().lower()
            if(comando == "sair"):
                self.transporte.close()
                return

    def connection_lost(self, exp):                
        EchoServer.ativas-=1
        print("Conexões ativas: {}".format(EchoServer.ativas))
        super().connection_lost(exp)

    def data_received(self, dados):                
        print('data received: {0}'.format(byteToHex(dados)))
        print('     received: {0}'.format(strPrintable(dados)))
        print('       string: {0}'.format(byteToStr(dados)))
        self.leitor.feed_data(dados)
        

loop = asyncio.get_event_loop()
coro = loop.create_server(EchoServer, '127.0.0.1', 8888)
servidor = loop.run_until_complete(coro)
print('Escutando {}'.format(servidor.sockets[0].getsockname()))

try:
    loop.run_forever()
except KeyboardInterrupt:
    print("exit")
finally:
    servidor.close()    
    loop.close()

A classe EchoServer herda de uma classe fornecida em asyncio.streams, chamada FlowControlMixin. A classe FlowControlMixin é por sua vez derivada de Protocols, também fornecida pelo módulo asyncio. A ideia desta classe é implementar um protocol factory, ou seja, um construtor de instâncias responsáveis pela implementação da gestão de cada nova conexão. Um protocolo normal, precisa herdar apenas de Protocols, mas para utilizar alguns métodos para leitura buferizada de linhas, especialmente o write.drain que veremos logo após, a implementação contida em FlowControlMixin é interessante.

Aproveitamos a nova classe para contar o número de conexões ativas. Cada nova conexão recebida por nosso servidor TCP/IP chama o construtor de nossa classe e o método connection_made, passando o transporte (entenda como o socket já conectado) como parâmetro.

Utilizando o parâmetro transporte, chamamos o método get_extra_info('peername') para obter o endereço do cliente que acabou de se conectar ao servidor. Logo em seguida, incrementamos o número de conexões. Veja que como o código que roda no loop de eventos não é multi-threaded, não precisamos de locks ou de outros mecanismos de controle, já que apenas uma função roda a cada vez. O resto do método connection_made prepara as instâncias do leitor e do escritor, objetos das classes StreamReader e StreamWriter respectivamente. Estes objetos vão fornecer corotinas úteis para ler e escrever os dados de forma não bloqueante. Veja que passamos transporte tanto para o escritor quanto para o leitor e que uma série de parâmetros são necessários para a inicialização do escritor.

No fim de connection_made, usamos a função asyncio.async para iniciar o processamento da corotina self.gerencia, dentro do loop de eventos. Veja que o método gerencia foi marcado com o decorador @asyncio.coroutine.

O método gerencia contém uma estrutura de repetição while que espera uma linha do cliente. Veja que utilizamos yield from para suspender a execução de gerencia enquanto self.leitor.readline() não terminar. Neste ponto, a execução volta para o loop de eventos e retorna apenas quando self.leitor.readline() contém uma linha enviada pelo cliente ou caso uma exceção tenha ocorrido. O uso do yield from é fundamental, pois caso o readline() esperasse o cliente enviar a linha para continuar a execução, todo o loop de eventos seria bloqueado. Como o uso do yield from, a execução volta para o loop que é livre para executar outros métodos e outras corotinas. O objetivo é não fazer o computador esperar por dados ou resultados que demoram muito tempo (ou um tempo desconhecido, possivelmente longo, para retornar). Outra característica de yield from é que o resultado do self.leitor.readline() é retornado e no caso, armazenado na variável dados.

A execução segue normalmente e nosso servidor envia a hora e a data atual, veja que uma linha foi acrescentada ao final da string. Este fim de linha é importante, pois como nosso protocolo é em formato texto e orientado a linhas, esperamos o enter (CR) para processar o comando ou a resposta.
Depois de escrever a resposta, usamos self.escritor.drain() que é uma outra corotina. Esta nova corotina não completa até o que o buffer de escrita seja enviado. Desta forma, podemos garantir que os dados foram enviados (ainda que não possamos ter certeza se estes foram recebidos pelo cliente) antes de continuarmos. Como usamos yield from com esta corotina, a execução é suspensa até que o drain seja completado.

Como usamos Python 3.4, os dados são do tipo byte e não string. A função byteToStr converte de bytes para string, usando a codificação UTF-8. Esta função será apresentada no programa common.py, compartilhando rotinas úteis tanto para nosso cliente quanto para nosso servidor. Para facilitar o processamento de comandos, retiramos os espaços em branco do início e fim do comando, inclusive enter (CR) e LF, e convertemos o resultado para minúsculas com lower. Se o comando for igual a "sair", chamamos o método close de self.transporte para encerarmos a conexão. Veja que ao fecharmos a conexão, finalmente retornamos como em uma função normal, utilizando return e terminando assim a execução de nossa corotina gerencia.

O método connection_lost é chamado quando a desconexão do cliente é detectada. O número de conexões ativas é decrementado e o método connection_lost da superclasse é chamado. O parâmetro exp contém None caso seja uma desconexão normal ou a exceção em caso de erro. Neste exemplo não estamos tratando os possíveis erros para nos concentrarmos no asyncio.

Já o método data_received é chamado sempre que dados forem recebidos pelo transporte. Os dados recebidos são passados como parâmetro (dados). Aqui, incluí algumas funções de debug que exibem os dados em formato hexadecimal, string e UTF-8. Estas funções são necessárias para verificarmos se os dados estão chegando no formato esperado. Você pode executar um teste com um programa de telnet clássico, como o Putty no Windows, mas não esqueça de desativar a opção de negociação do protocolo, para evitar que comandos que não interpretamos sejam enviados, ou simplesmente, teste com o cliente que é apresentado logo abaixo.

Um detalhe muito importante de data_received é a chamada do método self.leitor.feed_data, que envia os dados para o leitor, responsável por quebrar os dados em linhas.

Em nosso programa principal, obtemos o loop de eventos com get_event_loop() e criamos uma corotina que inicializa nosso servidor com loop.create_server. Em create_server, informamos o endereço que nosso servidor irá escutar (ip e porta). Veja que a classe EchoServer foi passada como protocol factory.

Ao chamarmos loop.run_until_complete(coro), o loop de eventos roda até que a corotina criada pelo create_server termine, retornando um objeto servidor, utilizado para parar o servidor e para ter acesso a todas as conexões, mas isso fica para outro post.

Chamamos loop.run_forever() para ativar nosso servidor. Para desativá-lo, digite CTRL+C.

Neste ponto, o endereço 127.0.0.1, porta 8888 estará recebendo conexões. Quando uma conexão for recebida, uma nova instância de EchoServer será criada. Ao se estabelecer a conexão o método connection_made será chamado e ativará uma corotina gerencia para gerenciar a recepção e o envio de linhas de comandos. O método data_received é chamado sempre que novos dados forem recebidos (seja uma linha completa ou não). O método connection_lost é chamado quando o cliente se desconectar.

Vejamos o código fonte de common.py:

import string

def byteToHex(data, sep=" "):
    return sep.join("{0:02X}".format(x) for x in data)

def strPrintable(data, sep=" "):
    return sep.join("{0:2s}".format(chr(s) if chr(s) in string.printable and s>30 else ".") for s in data)

def strToByte(s, encoding ="utf-8"):
    return s.encode(encoding)

def byteToStr(data, encoding ="utf-8"):
    return data.decode(encoding, errors="replace")

E o código fonte de nosso cliente.py:

import asyncio
import time
from common import *

class EchoClient(asyncio.streams.FlowControlMixin):        
    def connection_made(self, transporte):        
        peername = transporte.get_extra_info('peername')
        print('Conectado à {}'.format(peername))                
        self.transporte = transporte
        self.leitor = asyncio.StreamReader()
        self.leitor.set_transport(self.transporte)
        self.escritor = asyncio.StreamWriter(transport=self.transporte, protocol=self, reader=self.leitor, loop=asyncio.get_event_loop())        
        asyncio.async(self.gerencia())
        self.feito = asyncio.Future()
    
    @asyncio.coroutine
    def gerencia(self):        
        for x in range(10):                        
            self.escritor.write(strToByte("Alô\r\n"))
            yield from self.escritor.drain()

            dados = yield from self.leitor.readline()            
        self.escritor.write(strToByte("sair\r\n"))
        yield from self.escritor.drain()                                         
        self.transporte.close()
        self.feito.set_result(True)

    def connection_lost(self, exp):        
        print("Conexão perdida")
        super().connection_lost(exp)

    def data_received(self, dados):                
        print('dados recebidos: {0}'.format(byteToHex(dados)))
        print('      recebidos: {0}'.format(strPrintable(dados)))
        print('         string: {0}'.format(byteToStr(dados)))
        self.leitor.feed_data(dados)

        
loop = asyncio.get_event_loop()
coro = loop.create_connection(EchoClient, '127.0.0.1', 8888)
transporte, protocolo = loop.run_until_complete(coro)

try:
    loop.run_until_complete(protocolo.feito)
except KeyboardInterrupt:
    pass
finally:    
    loop.close()

Execute o servidor e depois o cliente, cada em uma janela ou terminal diferente. Veja que o cliente termina sua execução após enviar 10 vezes o comando Alô e sair. Execute várias vezes o cliente e veja que o servidor continua ativo. Experimente aumentar o número de comandos de 10 para 100 no cliente e reexecute. Tente executar a partir de uma terceira janela outro cliente simultaneamente. Observe que conseguimos implementar um cliente e um servidor TCP/IP em um pouco mais de 100 linhas de código em Python. Um servidor capaz de atender vários clientes sem utilizar múltiplos threads. Você pode comentar ou remover os prints que não precisar, eles servem apenas para debugar.

O código do cliente é muito parecido com o código do servidor. A principal mudança é o método gerencia e o atributo self.feito, criado com asyncio.Future(). Vejamos a criação da instância de nosso cliente. A função create_conection é na realidade uma corotina que recebe o protocolo (no caso EchoClient), o ip e a porta do servidor. Ao chamarmos loop.run_until_complete(coro), uma tupla com o transporte e o protocolo é retornada. Este retorno é importante, pois precisamos ter acesso a instância de EchoClient criada para gerenciar nossa conexão, no caso protocolo. Com a instância de EchoClient retornada em protocolo, podemos rodar o loop até que feito seja marcada como finalizada: loop.run_until_complete(protocolo.feito). Esta etapa é importante, pois devemos executar o loop até que o cliente tenha tempo para terminar seu trabalho. Veja que no final de gerencia, marcamos self.feito como concluída: self.feito.set_result(True).

Se você quiser testar o servidor com vários clientes simultaneamente, adicione a seguinte função ao código de cliente.py e modifique o programa principal para:

@asyncio.coroutine
def roda_varias(vezes):
    pendente = []
    for t in range(vezes):
        pendente.append(asyncio.async(loop.create_connection(EchoClient, '127.0.0.1', 8888)))
    for y in pendente:            
        transporte, protocolo = yield from y
        yield from asyncio.wait_for(protocolo.feito, None)

loop = asyncio.get_event_loop()
coro = loop.create_connection(EchoClient, '127.0.0.1', 8888)
client = loop.run_until_complete(coro)

try:
    loop.run_until_complete(roda_varias(100))
except KeyboardInterrupt:
    pass
finally:    
    loop.close()

Você também pode utilizar start_server e open_connection para receber diretamente uma tupla com StreamReader e StreamWriter, mas estes exemplos você pode encontrar na documentação do Python.

No próximo artigo, uma nova classe comum será usada para gerenciar os protocolos e outra forma de instanciação será passada para realizar uma comunicação entre vários clientes.