asyncio est une bibliothèque pour écrire du code de ** traitement parallèle ** en utilisant la syntaxe ** async / await **.
asyncio est utilisé comme base pour plusieurs frameworks Python asynchrones tels que des réseaux et des serveurs Web hautes performances, des bibliothèques de connectivité de base de données et des files d'attente de tâches distribuées.
asyncio --- E / S asynchrones - Documentation Python 3.9.0
import asyncio
async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')
# Python 3.7+
asyncio.run(main())
Qu'est-ce que c'est, n'est-ce pas comme async / await de C # ... Je suis gêné d'utiliser Python, mais je ne le savais que récemment. Parfois, j'écris du code C #, mais chaque fois que je touche async / await, je le voulais aussi pour Python ... Programmation asynchrone en C # | Microsoft Docs
threading.Thread
multiprocessing.Process
C'était une image autour. Parallèle - Documentation Python 3.9.0 Asyncio sera-t-il une option à l'avenir?
Hello World!
L'exemple au début est le code trouvé dans le document officiel ci-dessus. Commençons par déplacer ceci. Comme vous pouvez le voir dans les commentaires du code, cela fonctionne avec ** Python 3.7+ **. Aucune préparation particulière telle que l'installation de bibliothèques supplémentaires n'est requise.
helloworld.py
import asyncio
async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')
# Python 3.7+
asyncio.run(main())
Terminal
$ python3 helloworld.py
Hello ...
... World!
Assurez-vous qu'il y a une seconde entre Hello ... et ... World! Les points sont les suivants.
main ()
comme une fonction avec ```async`` (** coroutine **)ʻawait`` dans une fonction avec
ʻasync--Appeler ```asyncio.run (main ())
une seule foisMais je ne suis pas content de ça seul, non? Ce n'est pas un traitement asynchrone.
Alors qu'en est-il de ça?
async_sleep1.py
import asyncio
async def func1():
print('func1() started')
await asyncio.sleep(1)
print('func1() finished')
async def func2():
print('func2() started')
await asyncio.sleep(1)
print('func2() finished')
async def main():
task1 = asyncio.create_task(func1())
task2 = asyncio.create_task(func2())
await task1
await task2
asyncio.run(main())
Quand je l'exécute, func1 () '' et
func2 () '' démarrent presque en même temps, et après 1 seconde, func1 () '' et
func2 () '' se terminent presque en même temps. Vous pouvez voir qu'il y en a.
func1() started
func2() started
func1() finished
func2() finished
En gros, «attendre» est une image de ** «donnez votre tour à une autre personne et attendez-vous» **. Ici, le "travail de l'autre personne" est la tâche, qui peut être créée par l'une des méthodes suivantes.
--Spécifiez un processus prédéfini tel que ʻasyncio.sleep ()
`
ʻasync`` avec
ʻasyncio.create_task ()
Il existe une méthode appelée.
Ici, écrire simplement func1 ()
ne fait rien, mais en l'utilisant comme ʻasyncio.create_task (func1 ())
,
func1 ()
`fonctionne. Vous pouvez le mettre en veille afin de pouvoir le démarrer. Le simple fait de dire «veille» ne signifie pas qu'il commencera à fonctionner immédiatement (décrit plus loin).
asyncio.sleep?
Ce qui est inquiétant ici, c'est «attend asyncio.sleep (1)». Je comprends le sens, mais quand il s'agit de "attendre n secondes", `` time.sleep () '' est la norme. Et si je le change ici?
async_sleep2.py
import asyncio
async def func1():
print('func1() started')
#await asyncio.sleep(1)
time.sleep(1)
print('func1() finished')
async def func2():
print('func2() started')
#await asyncio.sleep(1)
time.sleep(1)
print('func2() finished')
async def main():
task1 = asyncio.create_task(func1())
task2 = asyncio.create_task(func2())
await task1
await task2
asyncio.run(main())
Malheureusement, func2 () '' ne démarrera pas avant la fin de
func1 () ''.
func1() started
func1() finished
func2() started
func2() finished
Afin d'exécuter les deux processus en parallèle, il est nécessaire d'arrêter les processus avec «attend asyncio.sleep (1)».
coroutine asyncio.sleep(delay, result=None, *, loop=None)
delay S'arrête pendant une seconde. Si le résultat est fourni, il sera renvoyé à l'appelant une fois le collout terminé. sleep () suspend toujours la tâche en cours et autorise l'exécution d'autres tâches.
Le point est, "Toujours suspendre la tâche en cours et permettre à d'autres tâches de s'exécuter."
Si vous appelez ʻasyncio.sleep ()
`pour attendre, ** abandonnez votre tour si le travail de quelqu'un d'autre arrive pendant que vous attendez. ** En d'autres termes, ** Le simple fait de laisser le processus prendre beaucoup de temps ne permet pas à d'autres processus d'entrer. ** **
Un autre exemple déroutant.
async_sleep3.py
import asyncio
import time
async def func1():
print('func1() started')
await asyncio.sleep(1)
print('func1() finished')
async def func2():
print('func2() started')
time.sleep(2)
print('func2() finished')
async def main():
task1 = asyncio.create_task(func1())
task2 = asyncio.create_task(func2())
await task1
await task2
asyncio.run(main())
func1 ()
utilise ```asyncio.sleep () pour attendre, tandis que
func2 () utilise
time.sleep () . Et
func2 () '' a un temps de sommeil plus long.
Le résultat de cette exécution est ...
func1() started
func2() started
func2() finished
func1() finished
Ce sera comme ça. ** func1 ()
, qui ne devrait dormir que pendant 1 seconde, se termine en quelque sorte après `` func2 () ''. ** **
Revenons maintenant à la description dans le document précédent.
sleep () suspend toujours la tâche en cours et autorise l'exécution d'autres tâches.
** Vous pouvez donner votre tour à quelqu'un d'autre avec ʻasyncio.sleep ()
, mais vous ne pouvez pas interrompre volontairement. ** Si un nouveau processus arrive pendant le sommeil avec
ʻasyncio.sleep (), il passera à cela, mais juste parce que le temps de sommeil est terminé, il est en cours d'exécution (
ʻasyncio.sleep ()Il n'interrompt pas le processus (pas dans
) et retourne le contrôle.
Dans l'exemple précédent, après avoir attendu 1 seconde avec func1 () '',
func2 () '' est toujours en cours d'exécution, alors ne le forcez pas à se relayer. Attendez que `` func2 () '' se termine avant de continuer. C'est une loi. [^ func2]
[^ func2]: Bien sûr, si func2 ()
abandonne l'ordre avec ʻasyncio.sleep ()
,
func1 ()
`peut commencer à fonctionner.
Est-ce que c'est comme ça si vous illustrez le traitement de «async_sleep1.py» qui peut être bien traité en parallèle?
Les zones colorées représentent la durée pendant laquelle vous avez réellement le contrôle.
Dans ce qui suit, ʻasyncio.sleep ()
est abrégé en
ʻaio.sleep () ``.
main () '' met
task1et
task2en attente avec
create_task () ''. task1 '' et
task2 '' ne démarrent pas immédiatement et attendent. task1
reçoit le contrôle. task1
libère le contrôle en exécutant ```asyncio.sleep (1) et attend 1 seconde. La
tâche2 '' reçoit le contrôle. task2
libère le contrôle en exécutant ```asyncio.sleep (1) `` et attend 1 seconde. Aucun traitement ne peut être exécuté. task1
a attendu 1 seconde et reçoit le contrôle. tâche1
est terminé, `` main () '' reprend le contrôle.,
main () abandonne le contrôle et attend que
task2`` se termine. Aucun traitement ne peut être exécuté. task2
attend 1 seconde et reçoit le contrôle. tâche2
est terminé, `` main () '' reprend le contrôle.D'autre part, le processus de `ʻasync_sleep3.py`` sera illustré comme suit.
Si vous utilisez time.sleep () '',
task2 '' est toujours en contrôle. Par conséquent, la tâche1 '' ne peut pas reprendre avant la fin de la
tâche2 '', même si elle a attendu 1 seconde.
Eh bien, par ici, nous remarquons. ** Il n'est pas possible de l'utiliser pour effectuer d'autres traitements en parallèle lors de la rotation du calcul sur la CPU. ** ** Plusieurs processus ne s'exécutent pas en même temps. ** Il s'agit en fait d'un seul thread. ** **
Par exemple, s'il est basé sur `` threading.Thread '', vous pouvez faire ce qui suit. [^ 1]
[^ 1]: Dans ce code, les deux processus fonctionnent en parallèle (apparemment), mais le GIL ne réduit pas le temps de traitement total. Si vous voulez accélérer avec le multi-core, le `` multiprocessing.Process '' est mieux. → J'ai étudié GIL que vous devriez savoir si vous faites un traitement parallèle avec Python --Qiita
example_threading.py
import threading
def func1():
print('func1() started')
x = 0
for i in range(1000000):
x += i
print('func1() finished:', x)
def func2():
print('func2() started')
x = 0
for i in range(1000000):
x += i * 2
print('func2() finished:', x)
def main():
thread1 = threading.Thread(target=func1)
thread2 = threading.Thread(target=func2)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
main()
func1() started
func2() started
func1() finished: 499999500000
func2() finished: 999999000000
** io ** C'est vrai. En d'autres termes, on peut dire qu'il s'agit d'un mécanisme de traitement (apparemment) parallèle, qui consiste principalement en un traitement en attente d'E / S (entrée / sortie) depuis le début.
Cependant, pour effectuer d'autres traitements entre-temps, il est nécessaire d'attendre en utilisant une fonction dédiée, et même time.sleep () '' ne peut pas être utilisé. Je pense que l'attente d'E / S qui apparaît souvent est la transmission et la réception du réseau, mais en déduisant des résultats jusqu'à présent, contrairement à **
threading.Thread '' etc., écrivez le code d'entrée / sortie normalement Cependant, il est peu probable que ce soit un traitement parallèle, alors comment devrions-nous l'utiliser ...? ** **
Ce que vous pouvez voir jusqu'ici, c'est que pour effectuer des entrées / sorties de manière asynchrone (apparemment en parallèle), il est nécessaire de ** "donner de l'ordre à d'autres traitements en attendant les entrées / sorties" **. C'est.
Par conséquent, il existe une fonction dans ʻasyncio`` qui gère le processus d'abandon de l'ordre du temps d'attente d'entrée / sortie, comme
`sleep``.
Essayons l'exemple de la communication socket dans le document suivant. Streams - Documentation Python 3.9.0
stream.py
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
Si vous l'exécutez avec une URL attachée à l'argument de ligne de commande comme indiqué ci-dessous, un en-tête HTTP sera renvoyé.
$ python3 stream.py https://www.google.com/
HTTP header> HTTP/1.0 200 OK
HTTP header> Content-Type: text/html; charset=ISO-8859-1
(Omis ci-dessous)
Cependant, cela seul n'est ni asynchrone ni rien, donc ce n'est pas intéressant, alors exécutons le processus qui utilise le processeur en arrière-plan en attendant.
stream2.py
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
async def cpu_work():
x = 0
for i in range(1000000):
x += i
if i % 100000 == 0:
print(x)
await asyncio.sleep(0.01)
async def main(url):
task1 = asyncio.create_task(print_http_headers(url))
task2 = asyncio.create_task(cpu_work())
await task1
await task2
url = sys.argv[1]
asyncio.run(main(url))
Comme indiqué ci-dessous, le calcul est effectué dans les coulisses jusqu'à ce qu'une réponse soit renvoyée du serveur, lorsqu'elle est renvoyée, le traitement est effectué, et lorsque le traitement est terminé, le reste du calcul est effectué.
0
5000050000
20000100000
45000150000
80000200000
HTTP header> HTTP/1.0 200 OK
HTTP header> Content-Type: text/html; charset=ISO-8859-1
(Omis)
125000250000
180000300000
245000350000
320000400000
405000450000
```Open_connection () et
readline () abandonnent le contrôle et abandonnent les autres jusqu'à ce qu'une réponse soit renvoyée. Le
cpu_work () '' (`` task2 '') qui a reçu l'ordre utilise le CPU pour effectuer des calculs, mais essaie périodiquement de donner l'ordre à une autre personne. Si personne ne vient, continuez le calcul, si quelqu'un vient, abandonnez le tour et attendez que le travail de l'autre personne soit terminé.
Le but ici est d'écrire ```await asyncio.sleep (0.01) dans
cpu_work () , et si vous oubliez cela, même si une réponse est retournée du serveur lors du calcul, elle sera traitée. Je ne peux pas. <s> Ancient </ s> Réminiscent du
DoEvents () '' qui était dans Visual Basic 6.0. Lors du traitement de boucle lourde, si vous ne l'appelez pas dans la boucle, la fenêtre se fige sans réponse. Si vous ne venez pas avec une épingle, demandez à la personne à la maison. </ s>
Cependant, jusqu'à présent, il s'agit d'un seul fil. Vous pouvez écrire avec `ʻawait`` pour exécuter plusieurs processus de rotation du processeur en même temps. De plus, même si le processus implique l'attente des entrées / sorties, il est assez gênant d'être conscient de «attend» à chaque fois que «read» ou «write», donc je pense que cela peut être amusant. Je vais.
Une façon de faire est d'utiliser `` run_in_executor () '' pour remplir le pool de processus (ou pool de threads).
executor.py
import asyncio
import concurrent.futures
def func1():
print("func1() started")
s = sum(i for i in range(10 ** 7))
print("func1() finished")
return s
def func2():
print("func2() started")
s = sum(i * i for i in range(10 ** 7))
print("func2() finished")
return s
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
task1 = loop.run_in_executor(pool, func1)
task2 = loop.run_in_executor(pool, func2)
result1 = await task1
result2 = await task2
print('result:', result1, result2)
asyncio.run(main())
Si vous faites cela, vous verrez que func1 () '' et
func2 () '' s'exécutent en même temps, comme indiqué ci-dessous. [^ 3]
[^ 3]: Uniquement lorsque le nombre de cœurs logiques est égal ou supérieur à 2. Mais à moins que vous n'utilisiez des services cloud comme VPS ou AWS ces jours-ci, vous aurez presque certainement plus de deux cœurs.
func1() started
func2() started
func1() finished
func2() finished
result: 49999995000000 333333283333335000000
En fait, vous pouvez utiliser un mécanisme appelé «pool de processus» pour exécuter plusieurs jobs en même temps (le pool de processus lui-même existait avant la sortie de «asyncio»). Il s'agit d'une méthode permettant de sécuriser certains processus à l'avance et de les réutiliser pour un traitement parallèle.
Notez que func1 ()
et func2 ()
n'ont pas ```async``. ** C'est juste une fonction ordinaire. ** **
Jusqu'à présent, j'ai utilisé loop.run_in_executor ()
au lieu d'utiliser asyncio.create_task () ``. Jusqu'à présent, une seule personne, y compris `` main () '', pouvait travailler, et je ne pouvais pas commencer un nouvel emploi à moins d'avoir un tour. Quand
asyncio.create_task () est exécuté,
main () a le contrôle, donc le processus d'entrée ne démarrera pas immédiatement. D'autre part, les pools de processus ** vous permettent d'exécuter plusieurs travaux en même temps. ** De plus, puisqu'il s'agit d'une trame distincte de
main () '', le processus peut être démarré immédiatement lorsque ** `` loop.run_in_executor () '' est appelé. ** Et vous pouvez effectuer plusieurs tâches en même temps, tant que vous disposez d'un nombre réservé de processus.
La valeur par défaut de "Nombre de processus réservés" est le nombre de processeurs CPU (le nombre de cœurs logiques, y compris l'hyperthreading). Comme test
with concurrent.futures.ProcessPoolExecutor() as pool:
Cette partie
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as pool:
Veuillez le changer en. Func2 () '' ne démarrera pas avant la fin de
func1 () ''.
Compte tenu du mécanisme gênant jusqu'à présent, il est devenu assez pratique à utiliser. ** Vous pouvez utiliser ceci (il existe des différences individuelles). ** Arrêtons le tsukkomi que ce n'est plus io. </ s>
Cela ressemble à ce qui suit. Le bleu est main () '' est le thread par défaut, et le violet est le processus en cours d'exécution dans le pool de processus. En dehors de
main () '', le traitement violet peut être exécuté simultanément jusqu'à `` max_workers '' ou le nombre de processeurs.
Cela ressemble à ceci lorsque max_workers = 1 ''. Puisqu'il n'y a qu'un seul processus violet à la fois, le suivant ne fonctionnera pas tant que
task1 '' (`` func1 () '') ne sera pas terminé.
Comme utilisé dans l'exemple précédent de ʻexecutor.py``, la valeur de retour d'une fonction qui s'exécute de manière asynchrone peut être obtenue comme valeur de retour de l'instruction
ʻawait``.
return_value.py
import asyncio
async def func1():
await asyncio.sleep(1)
return 12345
async def main():
task1 = asyncio.create_task(func1())
ret = await task1
print(ret) # 12345
asyncio.run(main())
Utilisez `ʻasyncio.gather`` si vous voulez attendre que tout soit terminé au lieu d'attendre un par un. L'exemple de «exécuteur.py» plus tôt est comme ceci.
result1, result2 = await asyncio.gather(task1, task2)
Vous pouvez utiliser ʻasyncio.wait_for ()
`pour interrompre le processus s'il ne se termine pas dans un certain laps de temps.
Dans l'exemple suivant, `` func1 () done '' n'est pas exécuté et le programme se termine.
timeout.py
import asyncio
async def func1():
print("func1() started")
await asyncio.sleep(10)
print("func1() finished")
async def main():
task1 = asyncio.create_task(func1())
try:
ret = await asyncio.wait_for(task1, 1)
print(ret)
except asyncio.TimeoutError:
print("timeout...")
asyncio.run(main())
Commençons par regarder la documentation quand nous en avons besoin. asyncio --- E / S asynchrones - Documentation Python 3.9.0
Recommended Posts