[Python] Ein Memo, das ich versucht habe, mit Asyncio zu beginnen

Was ist Asyncio?

asyncio ist eine Bibliothek zum Schreiben von ** Parallelverarbeitungscode ** mit der Syntax ** async / await **.

asyncio wird als Grundlage für mehrere asynchrone Python-Frameworks wie Hochleistungsnetzwerke und Webserver, Datenbankkonnektivitätsbibliotheken und verteilte Aufgabenwarteschlangen verwendet.

asyncio --- Asynchrone E / A - Python 3.9.0-Dokumentation

import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

# Python 3.7+
asyncio.run(main())

Was ist das, ist es nicht wie C # asynchron / warten ... Es ist mir peinlich, Python zu verwenden, aber ich wusste es bis vor kurzem nicht. Manchmal schreibe ich C # -Code, aber jedes Mal, wenn ich asynchron / warte, wollte ich das auch für Python ... Asynchrone Programmierung in C # | Microsoft Docs

Apropos asynchrone Verarbeitung in Python

Es war ein Bild herum. Parallel - Python 3.9.0-Dokumentation Wird Asyncio in Zukunft eine Option sein?

Überprüfungsumgebung

Hello World!

Das Beispiel am Anfang ist der Code, der im obigen offiziellen Dokument enthalten ist. Lassen Sie uns dies zunächst verschieben. Wie Sie in den Codekommentaren sehen können, funktioniert es mit ** Python 3.7+ **. Es sind keine besonderen Vorbereitungen wie die Installation zusätzlicher Bibliotheken erforderlich.

helloworld.py


import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

# Python 3.7+
asyncio.run(main())

Terminal


$ python3 helloworld.py
Hello ...
... World!

Stellen Sie sicher, dass zwischen Hello ... und ... World! 1 Sekunde liegt. Die Punkte sind wie folgt.

--Schreiben Sie den Prozess, der "main ()" entspricht, als Funktion mit "async" (** coroutine **).

Aber damit bin ich allein nicht zufrieden, oder? Es ist keine asynchrone Verarbeitung.

Beispiel für eine ordnungsgemäße asynchrone Verarbeitung

Was ist damit?

async_sleep1.py


import asyncio

async def func1():
    print('func1() started')
    await asyncio.sleep(1)
    print('func1() finished')

async def func2():
    print('func2() started')
    await asyncio.sleep(1)
    print('func2() finished')

async def main():
    task1 = asyncio.create_task(func1())
    task2 = asyncio.create_task(func2())
    await task1
    await task2

asyncio.run(main())

Wenn ich es ausführe, beginnen func1 () und func2 () fast zur gleichen Zeit, und nach 1 Sekunde enden func1 () und func2 () fast zur gleichen Zeit. Sie können sehen, dass es gibt.

func1() started
func2() started
func1() finished
func2() finished

Grob gesagt ist "Warten" ein Bild von "Geben Sie einer anderen Person den Zug und warten Sie auf sich selbst". Hier ist die "andere Person (Arbeit)" die Aufgabe, die mit einer der folgenden Methoden erstellt werden kann.

Es gibt eine Methode namens. Hier passiert nichts, indem man einfach func1 () schreibt, sondern indem man es wie asyncio.create_task (func1 ()) verwendet, func1 () funktioniert. Sie können es in den Standby-Modus versetzen, um es zu starten. Nur "Standby" zu sagen bedeutet nicht, dass es sofort funktioniert (später beschrieben).

asyncio.sleep?

Was hier besorgniserregend ist, ist "warte auf asyncio.sleep (1)". Ich verstehe die Bedeutung, aber wenn es darum geht, "auf n Sekunden zu warten", ist "time.sleep ()" der Standard. Was ist, wenn ich es hier ändere?

async_sleep2.py


import asyncio

async def func1():
    print('func1() started')
    #await asyncio.sleep(1)
    time.sleep(1)
    print('func1() finished')

async def func2():
    print('func2() started')
    #await asyncio.sleep(1)
    time.sleep(1)
    print('func2() finished')

async def main():
    task1 = asyncio.create_task(func1())
    task2 = asyncio.create_task(func2())
    await task1
    await task2

asyncio.run(main())

Leider startet func2 () erst, wenn func1 () endet.

func1() started
func1() finished
func2() started
func2() finished

Um die beiden Prozesse parallel auszuführen, müssen die Prozesse mit wait asyncio.sleep (1) gestoppt werden.

coroutine asyncio.sleep(delay, result=None, *, loop=None)

Verzögerung Stoppt für eine Sekunde. Wenn das Ergebnis angegeben wird, wird es nach Abschluss des Collouts an den Anrufer zurückgegeben. sleep () unterbricht immer die aktuelle Aufgabe und lässt andere Aufgaben laufen.

Der Punkt lautet: "Unterbrechen Sie immer die aktuelle Aufgabe und lassen Sie andere Aufgaben ausführen." Wenn Sie "asyncio.sleep ()" aufrufen und warten, ** geben Sie Ihren Zug auf, wenn die Arbeit eines anderen eingeht, während Sie warten. ** Mit anderen Worten, ** Nur wenn der Prozess lange dauert, können keine anderen Prozesse eingehen. ** ** **

Ein weiteres verwirrendes Beispiel.

async_sleep3.py


import asyncio
import time

async def func1():
    print('func1() started')
    await asyncio.sleep(1)
    print('func1() finished')

async def func2():
    print('func2() started')
    time.sleep(2)
    print('func2() finished')

async def main():
    task1 = asyncio.create_task(func1())
    task2 = asyncio.create_task(func2())
    await task1
    await task2

asyncio.run(main())

func1 () verwendet asyncio.sleep () um zu warten, während func2 () time.sleep () verwendet. Und func2 () hat eine längere Schlafzeit. Das Ergebnis dieser Ausführung ist ...

func1() started
func2() started
func2() finished
func1() finished

Es wird so sein. ** func1 (), das nur 1 Sekunde schlafen sollte, endet irgendwie nach func2 (). ** ** **

Lassen Sie uns nun die Beschreibung im vorherigen Dokument noch einmal durchgehen.

sleep () unterbricht immer die aktuelle Aufgabe und lässt andere Aufgaben laufen.

** Sie können mit asyncio.sleep () jemand anderem an die Reihe kommen, aber Sie können nicht freiwillig unterbrechen. ** Wenn ein neuer Prozess während des Schlafens mit asyncio.sleep () eingeht, wird er darauf umgeschaltet, aber nur weil die Ruhezeit abgelaufen ist, wird er ausgeführt (asyncio.sleep () ` Es unterbricht den Prozess nicht (nicht in `) und gibt die Steuerung zurück. Im vorherigen Beispiel wird nach 1 Sekunde Wartezeit mit func1 () func2 () immer noch ausgeführt. Erzwingen Sie also nicht, dass es sich abwechselt. Warten Sie, bis func2 () `` beendet ist, bevor Sie fortfahren. Es ist ein Gesetz. [^ func2]

[^ func2]: Wenn func2 () die Bestellung mit asyncio.sleep () aufgibt, kann func1 () natürlich funktionieren.

Illustriert basierend auf dem oben genannten

Ist es so, wenn Sie die Verarbeitung von "async_sleep1.py" veranschaulichen, die gut parallel verarbeitet werden kann? Die farbigen Bereiche geben an, wie lange Sie tatsächlich die Kontrolle haben. Im Folgenden wird "asyncio.sleep ()" als "aio.sleep ()" abgekürzt.

image.png

  1. main () versetzt task1 und task2 mit create_task () in den Standby-Modus. task1 und task2 starten nicht sofort und warten.
  2. Mit await gibt main () die Kontrolle auf und wartet, bis task1 beendet ist. task1 erhält die Kontrolle.
  3. task1 gibt die Kontrolle durch Ausführen von asyncio.sleep (1) frei und wartet 1 Sekunde. task2 erhält die Kontrolle.
  4. task2 gibt die Kontrolle durch Ausführen von asyncio.sleep (1) frei und wartet 1 Sekunde. Es kann keine Verarbeitung ausgeführt werden.
  5. task1 wartet 1 Sekunde und erhält die Kontrolle.
  6. Nachdem die Verarbeitung von task1 abgeschlossen ist, erlangt main () die Kontrolle zurück.
  7. Mit await gibt main () die Kontrolle auf und wartet, bis task2 beendet ist. Es kann keine Verarbeitung ausgeführt werden.
  8. task2 wartet 1 Sekunde und erhält die Kontrolle.
  9. Nachdem die Verarbeitung von task2 abgeschlossen ist, erlangt main () die Kontrolle zurück.
  10. Die Verarbeitung von main () endet.

Andererseits wird der Prozess von "async_sleep3.py" wie folgt dargestellt.

image.png

Wenn Sie time.sleep () verwenden, hat task2 immer noch die Kontrolle. Obwohl task1 1 Sekunde gewartet hat, kann es daher nicht neu gestartet werden, bis task2 beendet ist.

Nun, hier bemerken wir. ** Es ist nicht möglich, damit eine andere Verarbeitung parallel durchzuführen, während die Berechnung auf der CPU gedreht wird. ** ** ** Es werden nicht mehrere Prozesse gleichzeitig ausgeführt. ** Dies ist eigentlich ein einzelner Thread. ** ** **

Wenn es beispielsweise auf "threading.Thread" basiert, können Sie Folgendes tun. [^ 1]

[^ 1]: In diesem Code arbeiten die beiden Prozesse (anscheinend) parallel, aber die GIL reduziert die Gesamtverarbeitungszeit nicht. Wenn Sie mit Multi-Core beschleunigen möchten, ist multiprocessing.Process besser. → Ich habe GIL untersucht, dass Sie wissen sollten, ob Sie so viel wie möglich parallel mit Python verarbeiten - Qiita

example_threading.py


import threading

def func1():
    print('func1() started')
    x = 0
    for i in range(1000000):
        x += i
    print('func1() finished:', x)

def func2():
    print('func2() started')
    x = 0
    for i in range(1000000):
        x += i * 2
    print('func2() finished:', x)

def main():
    thread1 = threading.Thread(target=func1)
    thread2 = threading.Thread(target=func2)
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

main()
func1() started
func2() started
func1() finished: 499999500000
func2() finished: 999999000000

Wenn Sie sich den Namen asyncio genau ansehen

** io ** Das stimmt. Mit anderen Worten kann gesagt werden, dass es sich um einen Mechanismus für die (scheinbar) parallele Verarbeitung handelt, bei der hauptsächlich von Anfang an auf E / A (Eingabe / Ausgabe) gewartet wird. Um jedoch in der Zwischenzeit eine andere Verarbeitung durchzuführen, muss mit einer dedizierten Funktion gewartet werden, und selbst "time.sleep ()" kann nicht verwendet werden. Ich denke, dass die E / A-Wartezeit, die häufig auftritt, das Senden und Empfangen des Netzwerks ist, aber im Gegensatz zu ** threading.Thread usw. aus den bisherigen Ergebnissen schließen, schreiben Sie den Eingabe- / Ausgabecode normal Es ist jedoch unwahrscheinlich, dass es sich um eine Parallelverarbeitung handelt. Wie sollten wir sie also verwenden ...? ** ** **

Eingabe und Ausgabe parallelisieren

Was wir bisher sehen können, ist, dass es zur asynchronen (scheinbar parallelen) Eingabe / Ausgabe erforderlich ist, ** "einer anderen Verarbeitung Befehl zu erteilen, während auf Eingabe / Ausgabe gewartet wird" **. Das ist. Daher gibt es in asyncio eine Funktion, die eine Verarbeitung ausführt, die die Reihenfolge der Eingabe- / Ausgabe-Wartezeit aufgibt, wie z. B. sleep.

Versuchen wir das Beispiel der Socket-Kommunikation im folgenden Dokument. Streams - Python 3.9.0-Dokumentation

stream.py


import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Wenn Sie es mit einer URL ausführen, die wie unten gezeigt an das Befehlszeilenargument angehängt ist, wird ein HTTP-Header zurückgegeben.

$ python3 stream.py https://www.google.com/
HTTP header> HTTP/1.0 200 OK
HTTP header> Content-Type: text/html; charset=ISO-8859-1
(Unten weggelassen)

Dies allein ist jedoch weder asynchron noch irgendetwas, daher ist es nicht interessant. Lassen Sie uns also den Prozess ausführen, der die CPU hinter den Kulissen verwendet, während Sie warten.

stream2.py


import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()

async def cpu_work():
    x = 0
    for i in range(1000000):
        x += i
        if i % 100000 == 0:
            print(x)
            await asyncio.sleep(0.01)

async def main(url):
    task1 = asyncio.create_task(print_http_headers(url))
    task2 = asyncio.create_task(cpu_work())
    await task1
    await task2

url = sys.argv[1]
asyncio.run(main(url))

Wie unten gezeigt, wird die Berechnung hinter den Kulissen ausgeführt, bis eine Antwort vom Server zurückgegeben wird, wenn sie zurückgegeben wird, die Verarbeitung ausgeführt wird und wenn die Verarbeitung abgeschlossen ist, wird der Rest der Berechnung ausgeführt.

0
5000050000
20000100000
45000150000
80000200000
HTTP header> HTTP/1.0 200 OK
HTTP header> Content-Type: text/html; charset=ISO-8859-1
(Weggelassen)
125000250000
180000300000
245000350000
320000400000
405000450000

open_connection () und readline () geben die Kontrolle auf und geben sie an andere ab, bis eine Antwort zurückgegeben wird. Das cpu_work () ( task2), das den Befehl erhalten hat, verwendet die CPU, um Berechnungen durchzuführen, versucht jedoch regelmäßig, den Befehl einer anderen Person zu erteilen. Wenn niemand kommt, setzen Sie die Berechnung fort. Wenn jemand kommt, geben Sie die Runde auf und warten Sie, bis die Arbeit der anderen Person beendet ist.

Hier geht es darum, wait asyncio.sleep (0.01) in cpu_work () zu schreiben. Wenn Sie dies vergessen, wird diese verarbeitet, auch wenn während der Berechnung eine Antwort vom Server zurückgegeben wird. Ich kann nicht Ancient </ s> Erinnert an die DoEvents () in Visual Basic 6.0. Wenn Sie bei der Verarbeitung schwerer Schleifen diese nicht in der Schleife aufrufen, friert das Fenster ohne Antwort ein. Wenn Sie nicht mit einer Stecknadel kommen, fragen Sie die Person zu Hause. </ s>

Ich möchte wirklich parallel verarbeiten

Bisher ist es jedoch ein einzelner Thread. Möglicherweise möchten Sie mit "Warten" schreiben, um mehrere CPU-Turning-Prozesse gleichzeitig auszuführen. Auch wenn der Prozess das Warten auf Eingabe / Ausgabe beinhaltet, ist es ziemlich mühsam, sich jedes Mal, wenn "Lesen" oder "Schreiben", des "Wartens" bewusst zu sein, so dass ich denke, dass es Spaß machen kann. Ich werde.

Eine Möglichkeit, dies zu tun, besteht darin, run_in_executor () zu verwenden, um den Prozesspool (oder Thread-Pool) zu füllen.

executor.py


import asyncio
import concurrent.futures

def func1():
    print("func1() started")
    s = sum(i for i in range(10 ** 7))
    print("func1() finished")
    return s

def func2():
    print("func2() started")
    s = sum(i * i for i in range(10 ** 7))
    print("func2() finished")
    return s

async def main():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        task1 = loop.run_in_executor(pool, func1)
        task2 = loop.run_in_executor(pool, func2)
        result1 = await task1
        result2 = await task2
        print('result:', result1, result2)

asyncio.run(main())

Wenn Sie dies tun, werden Sie sehen, dass func1 () und func2 () gleichzeitig ausgeführt werden, wie unten gezeigt. [^ 3]

[^ 3]: Nur wenn die Anzahl der logischen Kerne 2 oder mehr beträgt. Wenn Sie heutzutage keine Cloud-Dienste wie VPS oder AWS verwenden, verfügen Sie mit ziemlicher Sicherheit über mehr als zwei Kerne.

func1() started
func2() started
func1() finished
func2() finished
result: 49999995000000 333333283333335000000

Tatsächlich können Sie einen Mechanismus namens "Prozesspool" verwenden, um mehrere Jobs gleichzeitig auszuführen (der Prozesspool selbst war vorhanden, bevor "asyncio" herauskam). Dies ist eine Methode, um mehrere Prozesse im Voraus zu sichern und für die parallele Verarbeitung wiederzuverwenden. Beachten Sie, dass func1 () und func2 () kein async haben. ** Es ist nur eine gewöhnliche Funktion. ** ** **

Bisher habe ich "asyncio.create_task ()" verwendet, um die Verarbeitung durchzuführen, aber jetzt verwende ich "loop.run_in_executor ()". Bis jetzt konnte nur eine Person, einschließlich main (), arbeiten, und ich konnte keinen neuen Job beginnen, wenn ich nicht an der Reihe war. Wenn asyncio.create_task () ausgeführt wird, hat main () selbst die Kontrolle, sodass der Eingabeprozess nicht sofort gestartet wird. Auf der anderen Seite können Sie mit ** Prozesspools mehrere Jobs gleichzeitig ausführen. ** Da es sich um einen von main () getrennten Frame handelt, kann der Prozess sofort gestartet werden, wenn ** loop.run_in_executor () aufgerufen wird. ** Und Sie können mehrere Jobs gleichzeitig ausführen, solange Sie eine reservierte Anzahl von Prozessen haben.

Der Standardwert von "Anzahl reservierter Prozesse" ist die Anzahl der CPU-Prozessoren (die Anzahl der logischen Kerne einschließlich Hyperthreading). Als Test

with concurrent.futures.ProcessPoolExecutor() as pool:

Dieser Teil

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as pool:

Bitte ändern Sie es in. Func2 () startet erst, wenn func1 () endet.

In Anbetracht des bisher problematischen Mechanismus ist die Verwendung recht bequem geworden. ** Möglicherweise möchten Sie dies verwenden (es gibt individuelle Unterschiede). ** Lassen Sie uns den Tsukkomi stoppen, dass es nicht mehr io ist. </ s>

Illustriert

Es sieht wie folgt aus. Blau ist main () ist der Standardthread und lila ist der Prozess, der im Prozesspool ausgeführt wird. Abgesehen von main () kann die Purple-Verarbeitung gleichzeitig bis zu max_workers oder der Anzahl der Prozessoren ausgeführt werden. image.png

Es sieht so aus, wenn max_workers = 1. Da es jeweils nur einen lila Prozess gibt, funktioniert der nächste erst, wenn task1 (func1 ()) beendet ist. image.png

Andere Tipps

Ich möchte den Rückgabewert der Verarbeitung erhalten

Wie ich im vorherigen Beispiel von "executeor.py" verwendet habe, kann der Rückgabewert der asynchron ausgeführten Funktion als Rückgabewert der Anweisung "await" erhalten werden.

return_value.py


import asyncio

async def func1():
    await asyncio.sleep(1)
    return 12345

async def main():
    task1 = asyncio.create_task(func1())
    ret = await task1
    print(ret) # 12345

asyncio.run(main())

Ich möchte mehrere Prozesse gleichzeitig abwarten

Verwenden Sie asyncio.gather, wenn Sie warten möchten, bis alles erledigt ist, anstatt einzeln zu warten. Das vorherige Beispiel von executeor.py sieht so aus.

result1, result2 = await asyncio.gather(task1, task2)

Auszeit

Sie können asyncio.wait_for () verwenden, um den Prozess zu unterbrechen, wenn er nicht in einer bestimmten Zeit abgeschlossen ist. Im folgenden Beispiel wird func1 () beendet nicht ausgeführt und das Programm endet.

timeout.py


import asyncio

async def func1():
    print("func1() started")
    await asyncio.sleep(10)
    print("func1() finished")

async def main():
    task1 = asyncio.create_task(func1())
    try:
        ret = await asyncio.wait_for(task1, 1)
        print(ret)
    except asyncio.TimeoutError:
        print("timeout...")

asyncio.run(main())

Schließlich

Schauen wir uns zunächst die Dokumentation an, wenn wir sie brauchen. asyncio --- Asynchrone E / A - Python 3.9.0-Dokumentation

Recommended Posts