[Python] Un mémo que j'ai essayé de démarrer avec asyncio

Qu'est-ce que l'asyncio?

asyncio est une bibliothèque pour écrire du code de ** traitement parallèle ** en utilisant la syntaxe ** async / await **.

asyncio est utilisé comme base pour plusieurs frameworks Python asynchrones tels que des réseaux et des serveurs Web hautes performances, des bibliothèques de connectivité de base de données et des files d'attente de tâches distribuées.

asyncio --- E / S asynchrones - Documentation Python 3.9.0

import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

# Python 3.7+
asyncio.run(main())

Qu'est-ce que c'est, n'est-ce pas comme async / await de C # ... Je suis gêné d'utiliser Python, mais je ne le savais que récemment. Parfois, j'écris du code C #, mais chaque fois que je touche async / await, je le voulais aussi pour Python ... Programmation asynchrone en C # | Microsoft Docs

En parlant de traitement asynchrone en Python

C'était une image autour. Parallèle - Documentation Python 3.9.0 Asyncio sera-t-il une option à l'avenir?

Environnement de vérification

Hello World!

L'exemple au début est le code trouvé dans le document officiel ci-dessus. Commençons par déplacer ceci. Comme vous pouvez le voir dans les commentaires du code, cela fonctionne avec ** Python 3.7+ **. Aucune préparation particulière telle que l'installation de bibliothèques supplémentaires n'est requise.

helloworld.py


import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

# Python 3.7+
asyncio.run(main())

Terminal


$ python3 helloworld.py
Hello ...
... World!

Assurez-vous qu'il y a une seconde entre Hello ... et ... World! Les points sont les suivants.

Mais je ne suis pas content de ça seul, non? Ce n'est pas un traitement asynchrone.

Exemple de traitement asynchrone approprié

Alors qu'en est-il de ça?

async_sleep1.py


import asyncio

async def func1():
    print('func1() started')
    await asyncio.sleep(1)
    print('func1() finished')

async def func2():
    print('func2() started')
    await asyncio.sleep(1)
    print('func2() finished')

async def main():
    task1 = asyncio.create_task(func1())
    task2 = asyncio.create_task(func2())
    await task1
    await task2

asyncio.run(main())

Quand je l'exécute, func1 () '' et func2 () '' démarrent presque en même temps, et après 1 seconde, func1 () '' et func2 () '' se terminent presque en même temps. Vous pouvez voir qu'il y en a.

func1() started
func2() started
func1() finished
func2() finished

En gros, «attendre» est une image de ** «donnez votre tour à une autre personne et attendez-vous» **. Ici, le "travail de l'autre personne" est la tâche, qui peut être créée par l'une des méthodes suivantes.

--Spécifiez un processus prédéfini tel que ʻasyncio.sleep () `

Il existe une méthode appelée. Ici, écrire simplement func1 () ne fait rien, mais en l'utilisant comme ʻasyncio.create_task (func1 ()) , func1 () `fonctionne. Vous pouvez le mettre en veille afin de pouvoir le démarrer. Le simple fait de dire «veille» ne signifie pas qu'il commencera à fonctionner immédiatement (décrit plus loin).

asyncio.sleep?

Ce qui est inquiétant ici, c'est «attend asyncio.sleep (1)». Je comprends le sens, mais quand il s'agit de "attendre n secondes", `` time.sleep () '' est la norme. Et si je le change ici?

async_sleep2.py


import asyncio

async def func1():
    print('func1() started')
    #await asyncio.sleep(1)
    time.sleep(1)
    print('func1() finished')

async def func2():
    print('func2() started')
    #await asyncio.sleep(1)
    time.sleep(1)
    print('func2() finished')

async def main():
    task1 = asyncio.create_task(func1())
    task2 = asyncio.create_task(func2())
    await task1
    await task2

asyncio.run(main())

Malheureusement, func2 () '' ne démarrera pas avant la fin de func1 () ''.

func1() started
func1() finished
func2() started
func2() finished

Afin d'exécuter les deux processus en parallèle, il est nécessaire d'arrêter les processus avec «attend asyncio.sleep (1)».

coroutine asyncio.sleep(delay, result=None, *, loop=None)

delay S'arrête pendant une seconde. Si le résultat est fourni, il sera renvoyé à l'appelant une fois le collout terminé. sleep () suspend toujours la tâche en cours et autorise l'exécution d'autres tâches.

Le point est, "Toujours suspendre la tâche en cours et permettre à d'autres tâches de s'exécuter." Si vous appelez ʻasyncio.sleep () `pour attendre, ** abandonnez votre tour si le travail de quelqu'un d'autre arrive pendant que vous attendez. ** En d'autres termes, ** Le simple fait de laisser le processus prendre beaucoup de temps ne permet pas à d'autres processus d'entrer. ** **

Un autre exemple déroutant.

async_sleep3.py


import asyncio
import time

async def func1():
    print('func1() started')
    await asyncio.sleep(1)
    print('func1() finished')

async def func2():
    print('func2() started')
    time.sleep(2)
    print('func2() finished')

async def main():
    task1 = asyncio.create_task(func1())
    task2 = asyncio.create_task(func2())
    await task1
    await task2

asyncio.run(main())

func1 () utilise ```asyncio.sleep () pour attendre, tandis que func2 () utilise time.sleep () . Et func2 () '' a un temps de sommeil plus long. Le résultat de cette exécution est ...

func1() started
func2() started
func2() finished
func1() finished

Ce sera comme ça. ** func1 (), qui ne devrait dormir que pendant 1 seconde, se termine en quelque sorte après `` func2 () ''. ** **

Revenons maintenant à la description dans le document précédent.

sleep () suspend toujours la tâche en cours et autorise l'exécution d'autres tâches.

** Vous pouvez donner votre tour à quelqu'un d'autre avec ʻasyncio.sleep () , mais vous ne pouvez pas interrompre volontairement. ** Si un nouveau processus arrive pendant le sommeil avec ʻasyncio.sleep (), il passera à cela, mais juste parce que le temps de sommeil est terminé, il est en cours d'exécution (ʻasyncio.sleep ()Il n'interrompt pas le processus (pas dans) et retourne le contrôle. Dans l'exemple précédent, après avoir attendu 1 seconde avec func1 () '', func2 () '' est toujours en cours d'exécution, alors ne le forcez pas à se relayer. Attendez que `` func2 () '' se termine avant de continuer. C'est une loi. [^ func2]

[^ func2]: Bien sûr, si func2 () abandonne l'ordre avec ʻasyncio.sleep () , func1 () `peut commencer à fonctionner.

Illustré sur la base de ce qui précède

Est-ce que c'est comme ça si vous illustrez le traitement de «async_sleep1.py» qui peut être bien traité en parallèle? Les zones colorées représentent la durée pendant laquelle vous avez réellement le contrôle. Dans ce qui suit, ʻasyncio.sleep () est abrégé en ʻaio.sleep () ``.

image.png

  1. main () '' met task1et task2en attente avec create_task () ''. task1 '' et task2 '' ne démarrent pas immédiatement et attendent.
  2. En raison de «« attend »,« main () »abandonne le contrôle et attend que« task1 »se termine. task1 reçoit le contrôle.
  3. task1 libère le contrôle en exécutant ```asyncio.sleep (1) et attend 1 seconde. La tâche2 '' reçoit le contrôle.
  4. task2 libère le contrôle en exécutant ```asyncio.sleep (1) `` et attend 1 seconde. Aucun traitement ne peut être exécuté.
  5. task1 a attendu 1 seconde et reçoit le contrôle.
  6. Maintenant que le traitement de tâche1 est terminé, `` main () '' reprend le contrôle.
  7. Avec ```await, main () abandonne le contrôle et attend que task2`` se termine. Aucun traitement ne peut être exécuté.
  8. task2 attend 1 seconde et reçoit le contrôle.
  9. Maintenant que le traitement de tâche2 est terminé, `` main () '' reprend le contrôle.
  10. Le traitement de `` main () '' se termine.

D'autre part, le processus de `ʻasync_sleep3.py`` sera illustré comme suit.

image.png

Si vous utilisez time.sleep () '', task2 '' est toujours en contrôle. Par conséquent, la tâche1 '' ne peut pas reprendre avant la fin de la tâche2 '', même si elle a attendu 1 seconde.

Eh bien, par ici, nous remarquons. ** Il n'est pas possible de l'utiliser pour effectuer d'autres traitements en parallèle lors de la rotation du calcul sur la CPU. ** ** Plusieurs processus ne s'exécutent pas en même temps. ** Il s'agit en fait d'un seul thread. ** **

Par exemple, s'il est basé sur `` threading.Thread '', vous pouvez faire ce qui suit. [^ 1]

[^ 1]: Dans ce code, les deux processus fonctionnent en parallèle (apparemment), mais le GIL ne réduit pas le temps de traitement total. Si vous voulez accélérer avec le multi-core, le `` multiprocessing.Process '' est mieux. → J'ai étudié GIL que vous devriez savoir si vous faites un traitement parallèle avec Python --Qiita

example_threading.py


import threading

def func1():
    print('func1() started')
    x = 0
    for i in range(1000000):
        x += i
    print('func1() finished:', x)

def func2():
    print('func2() started')
    x = 0
    for i in range(1000000):
        x += i * 2
    print('func2() finished:', x)

def main():
    thread1 = threading.Thread(target=func1)
    thread2 = threading.Thread(target=func2)
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

main()
func1() started
func2() started
func1() finished: 499999500000
func2() finished: 999999000000

Si vous regardez de près le nom asyncio

** io ** C'est vrai. En d'autres termes, on peut dire qu'il s'agit d'un mécanisme de traitement (apparemment) parallèle, qui consiste principalement en un traitement en attente d'E / S (entrée / sortie) depuis le début. Cependant, pour effectuer d'autres traitements entre-temps, il est nécessaire d'attendre en utilisant une fonction dédiée, et même time.sleep () '' ne peut pas être utilisé. Je pense que l'attente d'E / S qui apparaît souvent est la transmission et la réception du réseau, mais en déduisant des résultats jusqu'à présent, contrairement à ** threading.Thread '' etc., écrivez le code d'entrée / sortie normalement Cependant, il est peu probable que ce soit un traitement parallèle, alors comment devrions-nous l'utiliser ...? ** **

Pour paralléliser l'entrée et la sortie

Ce que vous pouvez voir jusqu'ici, c'est que pour effectuer des entrées / sorties de manière asynchrone (apparemment en parallèle), il est nécessaire de ** "donner de l'ordre à d'autres traitements en attendant les entrées / sorties" **. C'est. Par conséquent, il existe une fonction dans ʻasyncio`` qui gère le processus d'abandon de l'ordre du temps d'attente d'entrée / sortie, comme `sleep``.

Essayons l'exemple de la communication socket dans le document suivant. Streams - Documentation Python 3.9.0

stream.py


import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Si vous l'exécutez avec une URL attachée à l'argument de ligne de commande comme indiqué ci-dessous, un en-tête HTTP sera renvoyé.

$ python3 stream.py https://www.google.com/
HTTP header> HTTP/1.0 200 OK
HTTP header> Content-Type: text/html; charset=ISO-8859-1
(Omis ci-dessous)

Cependant, cela seul n'est ni asynchrone ni rien, donc ce n'est pas intéressant, alors exécutons le processus qui utilise le processeur en arrière-plan en attendant.

stream2.py


import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()

async def cpu_work():
    x = 0
    for i in range(1000000):
        x += i
        if i % 100000 == 0:
            print(x)
            await asyncio.sleep(0.01)

async def main(url):
    task1 = asyncio.create_task(print_http_headers(url))
    task2 = asyncio.create_task(cpu_work())
    await task1
    await task2

url = sys.argv[1]
asyncio.run(main(url))

Comme indiqué ci-dessous, le calcul est effectué dans les coulisses jusqu'à ce qu'une réponse soit renvoyée du serveur, lorsqu'elle est renvoyée, le traitement est effectué, et lorsque le traitement est terminé, le reste du calcul est effectué.

0
5000050000
20000100000
45000150000
80000200000
HTTP header> HTTP/1.0 200 OK
HTTP header> Content-Type: text/html; charset=ISO-8859-1
(Omis)
125000250000
180000300000
245000350000
320000400000
405000450000

```Open_connection () et readline () abandonnent le contrôle et abandonnent les autres jusqu'à ce qu'une réponse soit renvoyée. Le cpu_work () '' (`` task2 '') qui a reçu l'ordre utilise le CPU pour effectuer des calculs, mais essaie périodiquement de donner l'ordre à une autre personne. Si personne ne vient, continuez le calcul, si quelqu'un vient, abandonnez le tour et attendez que le travail de l'autre personne soit terminé.

Le but ici est d'écrire ```await asyncio.sleep (0.01) dans cpu_work () , et si vous oubliez cela, même si une réponse est retournée du serveur lors du calcul, elle sera traitée. Je ne peux pas. <s> Ancient </ s> Réminiscent du DoEvents () '' qui était dans Visual Basic 6.0. Lors du traitement de boucle lourde, si vous ne l'appelez pas dans la boucle, la fenêtre se fige sans réponse. Si vous ne venez pas avec une épingle, demandez à la personne à la maison. </ s>

Je veux vraiment traiter en parallèle

Cependant, jusqu'à présent, il s'agit d'un seul fil. Vous pouvez écrire avec `ʻawait`` pour exécuter plusieurs processus de rotation du processeur en même temps. De plus, même si le processus implique l'attente des entrées / sorties, il est assez gênant d'être conscient de «attend» à chaque fois que «read» ou «write», donc je pense que cela peut être amusant. Je vais.

Une façon de faire est d'utiliser `` run_in_executor () '' pour remplir le pool de processus (ou pool de threads).

executor.py


import asyncio
import concurrent.futures

def func1():
    print("func1() started")
    s = sum(i for i in range(10 ** 7))
    print("func1() finished")
    return s

def func2():
    print("func2() started")
    s = sum(i * i for i in range(10 ** 7))
    print("func2() finished")
    return s

async def main():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        task1 = loop.run_in_executor(pool, func1)
        task2 = loop.run_in_executor(pool, func2)
        result1 = await task1
        result2 = await task2
        print('result:', result1, result2)

asyncio.run(main())

Si vous faites cela, vous verrez que func1 () '' et func2 () '' s'exécutent en même temps, comme indiqué ci-dessous. [^ 3]

[^ 3]: Uniquement lorsque le nombre de cœurs logiques est égal ou supérieur à 2. Mais à moins que vous n'utilisiez des services cloud comme VPS ou AWS ces jours-ci, vous aurez presque certainement plus de deux cœurs.

func1() started
func2() started
func1() finished
func2() finished
result: 49999995000000 333333283333335000000

En fait, vous pouvez utiliser un mécanisme appelé «pool de processus» pour exécuter plusieurs jobs en même temps (le pool de processus lui-même existait avant la sortie de «asyncio»). Il s'agit d'une méthode permettant de sécuriser certains processus à l'avance et de les réutiliser pour un traitement parallèle. Notez que func1 () et func2 () n'ont pas ```async``. ** C'est juste une fonction ordinaire. ** **

Jusqu'à présent, j'ai utilisé loop.run_in_executor () au lieu d'utiliser asyncio.create_task () ``. Jusqu'à présent, une seule personne, y compris `` main () '', pouvait travailler, et je ne pouvais pas commencer un nouvel emploi à moins d'avoir un tour. Quand asyncio.create_task () est exécuté, main () a le contrôle, donc le processus d'entrée ne démarrera pas immédiatement. D'autre part, les pools de processus ** vous permettent d'exécuter plusieurs travaux en même temps. ** De plus, puisqu'il s'agit d'une trame distincte de main () '', le processus peut être démarré immédiatement lorsque ** `` loop.run_in_executor () '' est appelé. ** Et vous pouvez effectuer plusieurs tâches en même temps, tant que vous disposez d'un nombre réservé de processus.

La valeur par défaut de "Nombre de processus réservés" est le nombre de processeurs CPU (le nombre de cœurs logiques, y compris l'hyperthreading). Comme test

with concurrent.futures.ProcessPoolExecutor() as pool:

Cette partie

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as pool:

Veuillez le changer en. Func2 () '' ne démarrera pas avant la fin de func1 () ''.

Compte tenu du mécanisme gênant jusqu'à présent, il est devenu assez pratique à utiliser. ** Vous pouvez utiliser ceci (il existe des différences individuelles). ** Arrêtons le tsukkomi que ce n'est plus io. </ s>

Illustré

Cela ressemble à ce qui suit. Le bleu est main () '' est le thread par défaut, et le violet est le processus en cours d'exécution dans le pool de processus. En dehors de main () '', le traitement violet peut être exécuté simultanément jusqu'à `` max_workers '' ou le nombre de processeurs. image.png

Cela ressemble à ceci lorsque max_workers = 1 ''. Puisqu'il n'y a qu'un seul processus violet à la fois, le suivant ne fonctionnera pas tant que task1 '' (`` func1 () '') ne sera pas terminé. image.png

Autres conseils

Je veux obtenir la valeur de retour du traitement

Comme utilisé dans l'exemple précédent de ʻexecutor.py``, la valeur de retour d'une fonction qui s'exécute de manière asynchrone peut être obtenue comme valeur de retour de l'instruction ʻawait``.

return_value.py


import asyncio

async def func1():
    await asyncio.sleep(1)
    return 12345

async def main():
    task1 = asyncio.create_task(func1())
    ret = await task1
    print(ret) # 12345

asyncio.run(main())

Je veux effectuer plusieurs processus en même temps «attendre»

Utilisez `ʻasyncio.gather`` si vous voulez attendre que tout soit terminé au lieu d'attendre un par un. L'exemple de «exécuteur.py» plus tôt est comme ceci.

result1, result2 = await asyncio.gather(task1, task2)

temps libre

Vous pouvez utiliser ʻasyncio.wait_for () `pour interrompre le processus s'il ne se termine pas dans un certain laps de temps. Dans l'exemple suivant, `` func1 () done '' n'est pas exécuté et le programme se termine.

timeout.py


import asyncio

async def func1():
    print("func1() started")
    await asyncio.sleep(10)
    print("func1() finished")

async def main():
    task1 = asyncio.create_task(func1())
    try:
        ret = await asyncio.wait_for(task1, 1)
        print(ret)
    except asyncio.TimeoutError:
        print("timeout...")

asyncio.run(main())

finalement

Commençons par regarder la documentation quand nous en avons besoin. asyncio --- E / S asynchrones - Documentation Python 3.9.0

Recommended Posts