Adafruit Macropad : multitâche coopératif avec asyncio

Nous avons découvert dans mon précédent article le Macropad et comment utiliser la connexion série pour envoyer et recevoir des données depuis un ordinateur.

Dans la première partie de cet article, nous avions été confrontés à un problème alors que nous voulions changer la fréquence du clignotement de la DEL : il fallait attendre que l’instruction time.sleep soit terminée.

Nous allons vois ici comment utiliser asyncio et l’implémentation de async / await pour corriger ce problème. Nous essaierons ensuite d’aller plus loin afin d’appréhender cette bibliothèque.

Qu’est-ce qu’asyncio?

C’est une bibliothèque de Python qui permet de réaliser de la programmation asynchrone. Le principe ici est d’éviter que le programme attende sans rien faire. Ça tombe bien puisque dans l’exemple de la DEL qui clignote, notre programme ne fait qu’attendre, il est d’ailleurs impossible de traiter les entrées / sorties sur le port série à ce moment.

Attention, il n’est pas question ici de parallélisme, asyncio ne gère pas de fils d’exécution. Dans l’interpréteur Python c’est la classe Thread qui s’en charge. Mais d’abord CircuitPython ne les supporte pas et ils sont plus difficiles à gérer (concurrences, sections critiques, etc.).

Dans le cas d’asyncio, nous parlerons de coroutines pour des fonctions déclarées avec le mot clef async.

Installer asyncio sur le Macropad

Il est possible que la bibliothèque asyncio et ses dépendances ne soient pas installées, il est alors possible de les installer avec circup. Commençons par créer un environnement virtuel Python:

python -m venv ~/venv_circup
source ~/venv_circup/bin/activate
pip install circup

Branchez ensuite le Macropad, montez le périphérique de stockage CIRCUITPY puis lancez la commande suivante:

circup install asyncio

Et voilà!

La DEL clignotante

Utilisons maintenant asyncio pour réécrire le code de la DEL clignotante. Voici le code à placer dans le fichier code.py à la racine du disque CIRCUITPY vous pouvez télécharger le fichier code.py ici:

from adafruit_macropad import MacroPad
import usb_cdc, asyncio

macropad = MacroPad()
timer = 1
color = 0xad20b4
led = 0

def exec_command (data):
    global timer
    command,option = data.split()
    print("cmd:{} opt:{}".format(command,option))
    if command == 'time':
        timer = float(option)

async def blink(led, color):
    #global timer
    macropad.pixels.brightness = 0.3
    while True:
        if  macropad.pixels[led] == (0,0,0):
            macropad.pixels[led] = color
        else:
            macropad.pixels[led] = 0x0
        print("l:{} c:{} t:{}".format(led,hex(color),timer))
        await asyncio.sleep(timer)

async def check_serial():
    data = bytearray()
    serial = usb_cdc.data
    # Avec le timeout, même si la ligne ne contient pas \n, elle
    # sera pris en compte
    serial.timeout = 1
    while True:
        if serial.in_waiting:
            data = serial.readline()
            print("serial data received")
            exec_command(data.decode("utf-8"))
        await asyncio.sleep(0)

async def main():
    t_led = asyncio.create_task(blink(led, color))
    t_serial = asyncio.create_task(check_serial())
    await asyncio.gather(t_led, t_serial)

asyncio.run(main())

Notre fonction de clignotement est définie par async def blink() : c’est maintenant une fonction asynchrone. Une fois l’état de la DEL changé, notre fonction d’attente asyncio.sleep() remplace timer.sleep. C’est en effet ce module qui se charge maintenant de l’attente, ainsi il pourra gérer l’enchainement des tâches.

La partie du code utilisé pour la gestion des entrées par le port série est elle aussi placée dans la fonction asynchrone check_serial(). Cette fois asyncio.sleep est définie à 0 afin de laisser l’ordonnanceur du module asyncio interrompre notre fonction et ainsi empêcher une coroutine de bloquer les autres en ne rendant jamais la main1.

Une nouvelle fonction asynchrone fait son apparition : main(). C’est ici que nous définissons nos tâches (contenants nos coroutines) t_led ett_serial. Ensuite asyncio.gather() permet de lancer l’exécution concurrente de nos deux tâches.

Enfin, notre fonction main() est lancée via asyncio.run permettant alors de l’exécuter jusqu’à sa fin.

Une fois le fichier sauvegardé, la DEL n°0 devrait se mettre à clignoter avec une fréquence d’une seconde.

Oui mais!

Lançons minicom sur le port série affichant la console . Nous observons ainsi les actions effectuées par notre programme grâce aux différents print disséminés dans le code.

minicom -D /dev/ttyACM0 -c on

Maintenant, depuis un autre terminal, changeons la fréquence de clignotement de la DEL à 20 secondes pour la remettre aussitôt à 1 seconde:

printf "time 20\n" > /dev/ttyACM1
printf "time 1\n" > /dev/ttyACM1

En observant les messages affichés dans la minicom, nous pouvons voir que contrairement à la version sans coroutine, les messages sont reçus et traités immédiatement. Voici les messages affichés via nos différents print() dans minicom:

[...]
l:0 c:0xad20b4 t:1.0
l:0 c:0xad20b4 t:1.0
l:0 c:0xad20b4 t:1.0
serial data received
cmd:time opt:20
l:0 c:0xad20b4 t:20.0
serial data received
cmd:time opt:1
l:0 c:0xad20b4 t:1.0
l:0 c:0xad20b4 t:1.0
[...]

Les lignes serial data received et cmd:time opt:<> sont apparues immédiatement, mais lors du changement de fréquence de 20 à 1, il a fallu attendre que les 20 secondes définies précédemment soient écoulées pour que la modification soit prise en compte par la coroutine blink().

Prendre en compte immédiatement la modification de fréquence

Comment pouvons nous faire en sorte que la modification de fréquence soit immédiatement répercutée ? Tout simplement en utilisant la méthode cancel() sur notre tâche t_led. Celle-ci lève l’exception CancelError dans la coroutine blink(). C’est bien entendu à nous d’implémenter la gestion de cette exception.

Vous pouvez télécharger le fichier code.py ici, voici le code :

from adafruit_macropad import MacroPad
import usb_cdc, asyncio

macropad = MacroPad()
timer = 1
color = 0xad20b4
led = 0

def exec_command (data):
    global timer
    command,option = data.split()
    print("cmd:{} opt:{}".format(command,option))
    if command == 'time':
        timer = float(option)

async def blink(led, color):
    macropad.pixels.brightness = 0.3
    while True:
        try:
            if  macropad.pixels[led] == (0,0,0):
                macropad.pixels[led] = color
            else:
                macropad.pixels[led] = 0x0
            print("l:{} c:{} t:{}".format(led,hex(color),timer))
            await asyncio.sleep(timer)
        except asyncio.CancelledError:
            pass

async def check_serial(task_led):
    data = bytearray()
    serial = usb_cdc.data
    # Avec le timeout, même si la ligne ne contient pas \n, elle
    # sera pris en compte
    serial.timeout = 1
    while True:
        if serial.in_waiting:
            data = serial.readline()
            print("serial data received")
            exec_command(data.decode("utf-8"))
            task_led.cancel()
        await asyncio.sleep(0)

async def main():
    t_led = asyncio.create_task(blink(led, color))
    t_serial = asyncio.create_task(check_serial(t_led))
    await asyncio.gather(t_led, t_serial)

asyncio.run(main())

C’est notre fonction check_serial() qui lance le cancel() à partir de la tâche t_led que nous lui passons en paramètre. L’effet est alors immédiat : l’exception CancelError est lancée dans blink() forçant cette dernière à recommencer depuis le début du while. Notre modification est donc appliquée immédiatement.

Gestion dynamique des tâches

Maintenant que nous avons vu le fonctionnement de base d’asyncio, prenons un exemple un peu plus complet.

Vous pouvez télécharger le fichier code.py ici, voici le code :

from adafruit_macropad import MacroPad
import usb_cdc, asyncio, random

macropad = MacroPad()
macropad.pixels.brightness = 0.3
color = 0xad20b4

async def get_key(taskslist):
    while True:
        key_event = macropad.keys.events.get()
        if key_event:
            if key_event.pressed:
                print("key k:{} t:{}".format(key_event.key_number, len(taskslist)))
                taskslist.append(asyncio.create_task(blink(key_event.key_number)))
        await asyncio.sleep(0)

async def blink(led):
    timer = random.random() * 3
    for _ in range(5):
        macropad.pixels[led] = color
        await asyncio.sleep(timer)
        macropad.pixels[led] = 0x0
        await asyncio.sleep(timer)

async def manage_tasks(taskslist):
    tasks = 0
    print("Run task manager  t:{}".format(len(taskslist)))
    while True:
        for task_number in range(0, len(taskslist)):
            if taskslist[task_number].done():
                print("Remove task t:{}/{}".format(task_number + 1,len(taskslist)))
                taskslist.pop(task_number)
                break
        await asyncio.sleep(0)

async def main():
    tasks = []
    tasks.append(asyncio.create_task(get_key(tasks)))
    tasks.append(asyncio.create_task(manage_tasks(tasks)))
    await asyncio.gather(*tasks)

asyncio.run(main())

Lorsque nous appuyons sur une des douze touches du clavier de notre MacroPad, la DEL en dessous clignote cinq fois. Vous l’aurez compris, chaque clignotement est en fait une coroutine gérée par asyncio.

Les différentes tâches sont répertoriées dans un tableau qui est passé à la fonction asyncio.gather(). Plus intéressant, afin de supprimer de notre tableau les coroutines terminées, nous passons par la fonction manage_tasks() en utilisant asyncio.done() afin de vérifier que la tâche testée soit bien terminée.

Nous avons donc deux tâches lancées dès le départ :

  • get_key() chargée d’écouter les évènements clavier et de lancer les différents clignotements en fonction de la touche appuyée ;
  • manage_tasks chargée de nettoyer le tableau des tâches ;

Notre fonction manage_tasks() est simple, nous parcourons l’ensemble du tableau et lorsque nous trouvons une coroutine terminée nous supprimons l’élément du tableau. Remarquez le break lorsque notre fonction supprime un élément de notre taskslist, c’est nécessaire afin d’éviter une erreur d’index dû au fait que notre tableau contiendra un élément de moins.

Voici l’affichage des informations sur l’écran du Macropad (ou sur la console minicom, qui est plus agréable) :

Run task manager  t:2
key k:0 t:2
key k:3 t:3
key k:6 t:4
Remove task t:3/5
Remove task t:4/4
Remove task t:3/3

Nous pouvons ainsi voir la création des tâches par la fonction get_key() puis leur destruction dans manage_tasks() — merci les print().

Gestion de la concurrence

Contrairement à sa grande sœur CPython, notre implémentation de Python ici présente ne contient pas de primitive de synchronisation comme les sémaphores, variables conditions, … : seul le Lock est présent. C’est l’implémentation Python des Mutex : l’accès à une partie du code protégé par un Mutex sera sérialisé. Une seule coroutine y aura accès à la fois, les autres attendent leur tour une à une.

Voici un exemple de code pour notre Macropad utilisant Lock, vous pouvez télécharger le fichier code.py ici:

from adafruit_macropad import MacroPad
import usb_cdc, asyncio, random

macropad = MacroPad()
macropad.pixels.brightness = 0.3
run_color = 0xad20b4
wait_color = 0x21f312

# déclaration de notre Mutex
blink_mutex = asyncio.Lock()

async def get_key(taskslist):
    while True:
        key_event = macropad.keys.events.get()
        if key_event:
            if key_event.pressed:
                taskslist.append(asyncio.create_task(blink(key_event.key_number)))
        await asyncio.sleep(0)

async def blink(led):
    timer = random.random() * 3
    macropad.pixels[led] = wait_color
    if blink_mutex.locked():
        print("Wait for mutex l:{}".format(led))

    await blink_mutex.acquire()

    print("Aquire mutex l:{}".format(led))
    for _ in range(5):
        macropad.pixels[led] = run_color
        await asyncio.sleep(timer)
        macropad.pixels[led] = 0x0
        await asyncio.sleep(timer)

    blink_mutex.release()

async def manage_tasks(taskslist):
    tasks = 0
    while True:
        for task_number in range(0, len(taskslist)):
            if taskslist[task_number].done():
                taskslist.pop(task_number)
                break
        await asyncio.sleep(0)

async def main():
    tasks = []
    tasks.append(asyncio.create_task(get_key(tasks)))
    tasks.append(asyncio.create_task(manage_tasks(tasks)))
    await asyncio.gather(*tasks)

asyncio.run(main())

Lors de l’appui sur une touche, la fonction blink() DEL passe au vert, si le Mutex est libre, on passe de suite à la phase de clignotement. Ainsi une tâche en attente sera matérialisée.

Nous pouvons observer sur l’écran du Macropad différentes informations relatives à l’attente / acquisition de notre blink_mutex. Ces informations permettent de comprendre ce qui se passe lors de l’exécution de nos coroutines :

run task manager t:2
Aquire mutex l:3
Wait for mutex l:7
Wait for mutex l:11
Aquire mutex l:7
Aquire mutex l:11
Aquire mutex l:0
Wait for mutex l:5
Wait for mutex l:6
Aquire mutex l:5
Aquire mutex l:6

En conclusion

Nous avons vu dans cet article, au fils des différents exemples, comment utiliser asyncio pour gérer des coroutines, permettant d’implémenter dans nos programmes pour notre Macropad du multitâche coopératif.

Un petit tour sur la documentation spécifique à asyncio dans CircuitPython vous permettra d’aller plus loin. Elle est cependant très succincte et manque cruellement d’exemples2. Si vous voulez en apprendre plus sur la programmation asynchrone en Python, vous avez aussi l’excellent tutoriel de nohar sur Zeste de savoir, il m’a été d’une grande aide pour la rédaction de cet article.

Credits

Les photos proviennent du site Adafuit, prisent par Kattni Rembor et sous licence Creative Common By-Sa.


  1. Cette technique permet aussi de rendre une fonction compatible asynchrone 

  2. La dernière mise à jour date de juin 2022 au moment de la rédaction de cet article