Vollständiges Verständnis der asynchronen Python-Programmierung

Asynchrone Programmierung

Ich bin sicher, dass viele Leute von asynchroner Programmierung gehört haben. Beispielsweise ist das im Front-End verwendete JavaScript eine Single-Thread-Sprache, die den Haupt-Thread nicht blockiert, sodass verschiedene Funktionen asynchron implementiert werden. Node.js erbt auch diese Eigenschaft und eignet sich gut für E / A-gebundene Aufgaben. Wenn es jedoch um Python geht, unterstützt es die parallele und parallele Verarbeitung, sodass die meisten Benutzer keine Erfahrung mit der Verwendung von asynchroner Programmierung in ihren eigenen Projekten haben. Natürlich Tornado, Twisted und [Gevent](http: //www.gevent). Viele Leute haben asynchrone Frameworks wie org /) verwendet, weil sie berühmt sind, aber wenn Sie auf seltsame Fehler stoßen, können Sie diese nicht lösen.

Wie Sie den jüngsten Trends in PyCon entnehmen können, ist die asynchrone Programmierung wohl der nächste Trend im Python-Ökosystem. Darüber hinaus verkaufen aufstrebende Programmiersprachen wie Go und Rust asynchrone Verarbeitung und leistungsstarke Parallelverarbeitung. Da Python nicht besiegt werden darf, begann Guido, der Schöpfer von Python, 2013 selbst mit der Entwicklung des Tulip (asyncio) -Projekts.

1. Was ist asynchrone Verarbeitung?

Zunächst möchte ich das zugehörige Konzept und dann die asynchrone Verarbeitung erläutern.

1-1. Blockieren

Blockieren ist überall. Die CPU ist beispielsweise "[Context Switch](https://ja.wikipedia.org/wiki/%E3%82%B3%E3%83%B3%E3%83%86%E3%82%AD%E3%". 82% B9% E3% 83% 88% E3% 82% B9% E3% 82% A4% E3% 83% 83% E3% 83% 81 #: ~: Text =% E3% 82% B3% E3% 83% B3% E3% 83% 86% E3% 82% AD% E3% 82% B9% E3% 83% 88% E3% 82% B9% E3% 82% A4% E3% 83% 83% E3% 83% 81% 20 (Kontext% 20Schalter)% 20% E3% 81% A8,% E4% B8% 8D% E5% 8F% AF% E6% AC% A0% E3% 81% AA% E6% A9% 9F% E8% 83% Wenn Sie "BD% E3% 81% A7% E3% 81% 82% E3% 82% 8B% E3% 80% 82)" ausführen, können andere Prozesse nicht verarbeitet werden und werden blockiert. (Im Fall von Multi-Core ist der Core mit Kontextwechsel nicht mehr verfügbar.)

1-2. Nicht blockierend

Das Nicht-Blockieren ist die entgegengesetzte Seite des Blockierens, und das Blockieren eines bestimmten Prozesses verringert die Berechnungseffizienz, so dass der Prozess nicht blockiert wird.

1-3 Synchrone Verarbeitung

1-4 Asynchrone Verarbeitung

Die obige Kommunikationsmethode bezieht sich auf "Synchronisationsprimitiv" bei asynchroner Verarbeitung und paralleler Verarbeitung. Zum Beispiel Semapho, Sperre, Synchronisierungswarteschlange und so weiter. Diese Kommunikationsmethoden dienen zum Synchronisieren mehrerer Programme unter bestimmten Bedingungen. Und weil es eine asynchrone Verarbeitung gibt, sind diese Kommunikationsmethoden notwendig. Der Grund dafür ist, dass alle Programme, die eine synchrone Verarbeitung ausführen, von Anfang an in der richtigen Reihenfolge verarbeitet werden, sodass keine Kommunikationsmethode erforderlich ist.

1-5 Parallelverarbeitung

1-6 Parallelverarbeitung

Informationen zur parallelen / parallelen Verarbeitung finden Sie in Vorheriger Artikel.

1-7. Zusammenfassung des Konzepts

Das Programm sollte in mehrere Aufgaben aufgeteilt werden, um die Parallelität zu unterstützen. Blockieren / Nichtblockieren und Synchron / Asynchron werden für jede Aufgabe definiert. Daher sind parallel, asynchron und nicht blockierend eng miteinander verbunden.

1-8 Asynchrone Programmierung

Synchron / asynchron und blockierend / nicht blockierend sind nicht inkompatibel. Beispielsweise führt eine EC-Site eine asynchrone Verarbeitung für Mehrbenutzerzugriffsanforderungen durch, Inventaraktualisierungen müssen jedoch synchron verarbeitet werden.

1-9. Schwierigkeiten bei der asynchronen Programmierung

Infolgedessen vereinfachen die meisten asynchronen Frameworks das asynchrone Programmiermodell (wobei jeweils nur ein Ereignis zulässig ist). Die Diskussionen über asynchrone Programmierung konzentrieren sich auf Single-Threaded-Diskussionen.

Wenn Sie also asynchrone Programmierung verwenden, müssen Sie jeden asynchronen Aufruf verkleinern. "Klein" bedeutet hier, die Berechnungszeit zu verkürzen. Und wie man das Programm in asynchrone Aufgaben aufteilt, wird zu einer Herausforderung.

2. Gründe für die Einführung der asynchronen Programmierung

Wie bereits erwähnt, weist die asynchrone Programmierung viele Nachteile auf. "Asyncio", das der Ersteller von Python über vier Jahre selbst erstellt hat, wurde zu einer Standardbibliothek in Python 3.6. Aber warum müssen wir es so schwer machen? Der Grund ist, dass asynchrone Programmierung sehr nützlich ist.

2-1. CPU-Zeiterfassung

Unter der Annahme, dass die Anzahl der CPU-Takte "2,6 GHz" beträgt, ist es möglich, Befehle von $ 2,6 \ mal 10 ^ 9 $ pro Sekunde zu verarbeiten, und die für jede Befehlsverarbeitung erforderliche Zeit beträgt "0,38 ns". 0.38ns ist die kleinste Einheit im Sinne der Zeit der CPU. Und die kleinste Einheit des menschlichen Zeitgefühls sei "1s". Die folgende Tabelle ist eine Berechnung des Zeitgefühls der CPU basierend auf dem menschlichen Zeitgefühl. Außerdem beziehen sich die CPU-Verzögerungsdaten auf "Latenzzahlen, die jeder Programmierer kennen sollte".

wird bearbeitet Tatsächliche Verzögerung CPU-Zeitsinn
Anweisungsverarbeitung 0.38ns 1s
L1-Cache lesen 0.5ns 1.3s
Korrektur der Verzweigungsvorhersage 5ns 13s
L2-Cache lesen 7ns 18.2s
Ausschlusskontrolle 25ns 1m5s
Speicherreferenz 100ns 4m20s
Kontextwechsel 1.5µs 1h5m
Laden Sie 2 KB Daten in ein 1-Gbit / s-Netzwerk hoch 20µs 14.4h
Lesen Sie 1 MB fortlaufende Daten aus dem Speicher 250µs 7.5day
Pingen Sie den Host desselben Internet-Rechenzentrums an (Hin- und Rückfahrt). 0.5ms 15day
Lesen Sie 1 MB fortlaufende Daten von der SSD 1ms 1month
Lesen Sie 1 MB fortlaufende Daten von der Festplatte 20ms 20month
Pingen Sie einen Gastgeber in einer anderen Präfektur an (Hin- und Rückfahrt) 150ms 12.5year
Starten Sie die virtuelle Maschine neu 4s 300year
Starten Sie den Server neu 5m 25000year

Die CPU ist der Verarbeitungskern des Computers und eine wertvolle Ressource. Wenn Sie CPU-Ausführungszeit verschwenden und die Auslastung reduzieren, wird Ihr Programm unweigerlich weniger effizient. Wie die obige Tabelle zeigt, entspricht das Hochladen von 2 KB Daten über ein 1-Gbit / s-Netzwerk einem Stundenaufwand von 14 Stunden im Sinne der CPU. Wenn es sich um ein 10-Mbit / s-Netzwerk handelt, ist die Effizienz 100-mal niedriger. Der Vorgang, die CPU einfach so lange warten zu lassen und nicht auf eine andere Verarbeitung zu übertragen, ist nur eine Verschwendung der "Jugend" der CPU.

2-2. Echte Probleme

Wenn ein Programm Computercomputer-Ressourcen nicht gut nutzt, benötigt es mehr Computer, um die Lücke zu schließen. Durch die Neugestaltung eines Scraping-Programms mit asynchroner Programmierung können beispielsweise die ursprünglich erforderlichen 7 Server auf 3 reduziert und die Kosten um 57% gesenkt werden. Bei AWS kostet eine reservierte Instanz von m4.xlarge übrigens etwa 150.000 Yen pro Jahr.

Wenn Sie sich nicht für Geld interessieren, ist Ihnen Effizienz wirklich wichtig. Wenn Sie die Anzahl der Server auf eine bestimmte Anzahl erhöhen, müssen Sie die Architektur und das Programmdesign verbessern, oder selbst wenn Sie sie weiter erhöhen, wird die Leistung möglicherweise nicht verbessert. Und die Verwaltungskosten werden überwiegend erhöht. Wenn Sie eine Reservierung für PS5 oder XBOX Series X vornehmen, gibt die EC-Site möglicherweise einen 503-Fehler aus, da es sich eher um ein Architekturproblem als um die Anzahl der Server handelt.

"C10K-Problem" wurde 1999 eingereicht und wie es geht 1 GHz Es ist wie eine Herausforderung, 10.000 Clients gleichzeitig mit einem einzigen Server in einer Netzwerkumgebung aus CPU, 2G-Speicher und 1Gbps einen FTP-Dienst anbieten zu können. Seit 2010 wurde aufgrund der Verbesserung der Hardwareleistung das "C10M-Problem" nach C10K eingereicht. Das C10M-Problem ist ein Problem, das 1 Million gleichzeitige Zugriffe pro Sekunde in einer Netzwerkumgebung mit 8 Kern-CPUs, 64 G-Speicher und 10 Gbit / s verarbeitet.

Kosten und Effizienz sind Probleme aus Sicht der Unternehmensführung, und das Problem C10K / C10M stellt die Hardware vor eine technische Herausforderung. Wenn das C10K / C10M-Problem gelöst werden kann, werden gleichzeitig die Kosten- und Effizienzprobleme gelöst.

2-3. Lösung

Die CPU ist sehr schnell, aber der Kontextwechsel, das Lesen des Speichers, das Lesen der Festplatte und die Netzwerkkommunikation sind sehr langsam. Mit anderen Worten, außerhalb der CPU ist alles außer dem L1-Cache langsam. Ein Computer besteht aus fünf Hauptgeräten, einem Eingabegerät, einem Ausgabegerät, einem Speichergerät, einem Steuergerät und einem Berechnungsgerät. Das Steuergerät und das Berechnungsgerät befinden sich in der CPU, die anderen sind jedoch alle E / A. Das Lesen und Schreiben von Speicher, das Lesen und Schreiben von Festplatten sowie das Lesen und Schreiben auf eine Netzwerkkarte sind alles E / A. I / O ist der größte Engpass.

Asynchrone Programme können effizienter sein, aber der größte Engpass ist E / A. Daher lautet die Lösung "Asynchrone E / A". Es heißt 9D% 9E% E5% 90% 8C% E6% 9C% 9FIO).

3. Der Entwicklungspfad der asynchronen E / A.

Das größte Programm der Welt ist wahrscheinlich das Internet. Aus der Tabelle [CPU time sense](# 2-1-cpu time sense) geht hervor, dass die Netzwerk-E / A langsamer als die Festplatten-E / A ist und den größten Engpass darstellt. Verschiedene asynchrone Frameworks zielen auf Netzwerk-E / A ab, da nichts langsamer ist als Netzwerk-E / A, außer der Neustart des Servers.

Nehmen wir als Beispiel das Schaben. Hier werden wir 10 Webseiten aus dem Internet herunterladen.

3-1. Synchrone Blockierungsmethode

Die einfachste Methode ist das Herunterladen in der richtigen Reihenfolge. Es wird in der Reihenfolge des Aufbaus einer Socket-Verbindung, der Anforderungsübertragung und des Antwortempfangs ausgeführt.

import socket
import time


def blocking_way():
    sock = socket.socket()
    # blocking
    sock.connect(('example.com', 80))
    request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        # blocking
        chunk = sock.recv(4096)
    return response


def sync_way():
    res = []
    for i in range(10):
        res.append(blocking_way())
    return len(res)


if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        start = time.time()
        sync_way()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

Ausführungsergebnis:

elapsed_time: 2.76[sec]
elapsed_time: 2.58[sec]
elapsed_time: 2.56[sec]
elapsed_time: 2.58[sec]
elapsed_time: 2.85[sec]
elapsed_time: 2.66[sec]
elapsed_time: 2.60[sec]
elapsed_time: 3.38[sec]
elapsed_time: 2.88[sec]
elapsed_time: 2.67[sec]
mean_elapsed_time: 2.75[sec]

Die durchschnittliche Zeit für 10 Mal beträgt 2,75 Sekunden. Die Funktion blocking_way () stellt eine Socket-Verbindung her, sendet eine HTTP-Anfrage, liest die HTTP-Antwort vom Socket und gibt die Daten zurück. Und die Funktion sync_way () hat es gerade 10 Mal wiederholt. Im obigen Code sendet "sock.connect ((" example.com ", 80))" eine Anfrage an die Servernummer 80, und "sock.recv (4096)" enthält 4 KB Byte-Daten von "socket". Lesen.

Wann eine Netzwerkverbindung hergestellt wird, wird nicht von der Clientseite bestimmt, sondern von der Netzwerkumgebung und der Verarbeitungsleistung des Servers. Und es ist unvorhersehbar, wann Daten vom Server zurückgegeben werden. Daher sind standardmäßig "sock.connect ()" und "sock.recv ()" blockiert. Andererseits ist sock.send () lang und blockiert nicht. sock.send () gibt den Rückgabewert zurück, sobald die Anforderungsdaten in den Puffer des TCP / IP-Protokollstapels kopiert wurden, sodass nicht auf eine Antwort vom Server gewartet wird.

Wenn die Netzwerkumgebung sehr schlecht ist und das Herstellen einer Verbindung 1 Sekunde dauert, wird "sock.connect ()" 1 Sekunde lang blockiert. Diese eine Sekunde fühlt sich für eine 2,6-GHz-CPU wie 83 Jahre an. In den letzten 83 Jahren konnte die CPU nichts tun. Ebenso muss sock.recv () warten, bis der Client eine Antwort vom Server erhält. Wiederholen Sie diese Blockierung nach zehnmaligem Herunterladen der example.com-Homepage zehnmal. Aber was ist mit Scraping in großem Maßstab, bei dem täglich 10 Millionen Webseiten heruntergeladen werden?

Zusammenfassend ist die Netzwerk-E / A, wie z. B. das synchrone Blockieren, sehr ineffizient, insbesondere bei Programmen, die häufig kommunizieren. Ein solches Verfahren kann C10K / C10M nicht lösen.

3-2. Verbesserung der synchronen Blockierungsmethode: Mehrprozessmethode

Wenn es einige Zeit dauert, dasselbe Programm zehnmal auszuführen, können Sie dasselbe Programm zehnmal gleichzeitig ausführen. Daher werden wir einen Multi-Prozess einführen. Im Betriebssystem vor Linux 2.4 ist der Prozess übrigens die Entität der Aufgabe, und das Betriebssystem wurde prozessorientiert konzipiert.

import socket
import time
from concurrent.futures import ProcessPoolExecutor


def blocking_way():
    sock = socket.socket()
    # blocking
    sock.connect(('example.com', 80))
    request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        # blocking
        chunk = sock.recv(4096)
    return response


def multi_process_way():
    with ProcessPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(blocking_way) for i in range(10)}
    return len([future.result() for future in futures])


if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        start = time.time()
        multi_process_way()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

Ausführungsergebnis:

elapsed_time: 0.49[sec]
elapsed_time: 0.47[sec]
elapsed_time: 0.48[sec]
elapsed_time: 0.49[sec]
elapsed_time: 0.54[sec]
elapsed_time: 0.51[sec]
elapsed_time: 0.56[sec]
elapsed_time: 0.52[sec]
elapsed_time: 0.47[sec]
elapsed_time: 0.47[sec]
mean_elapsed_time: 0.50[sec]

Die durchschnittliche Zeit für 10 Mal beträgt 0,50 Sekunden. Es war effektiv. Das Problem ist jedoch, dass es nicht ein Zehntel der synchronen Blockierungsmethode ist. Der Grund ist, dass die CPU der Ausführungsumgebung nicht aus 10 Kernen besteht und Prozesse gewechselt werden müssen.

Das Umschalten von Prozessen ist nicht so günstig wie der in [CPU Time Sense](# 2-1-cpu Time Sense) gezeigte CPU-Kontextwechsel. Wenn die CPU von einem Prozess zu einem anderen wechselt, speichert sie zuerst alle Register- und Speicherzustände der ursprünglichen Prozesslaufzeit und stellt dann den gespeicherten Zustand des anderen Prozesses wieder her. Für die CPU ist es, als würde man ein paar Stunden warten. Wenn die Anzahl der Prozesse größer als die Anzahl der CPU-Kerne ist, müssen die Prozesse gewechselt werden.

Neben dem Umschalten hat Multiprozess einen weiteren Nachteil. Da ein normaler Server in einem stabilen Zustand arbeitet, ist die Anzahl der Prozesse, die gleichzeitig verarbeitet werden können, auf zehn bis Hunderte begrenzt. Zu viele Prozesse können Ihr System instabil machen und die Speicherressourcen ausgehen.

Neben Switching und Small Scale hat Multi-Process Probleme wie Status- und Datenaustausch.

3-3. Weitere Verbesserung der synchronen Blockierungsmethode: Multithread-Methode

Die Datenstruktur von Threads ist leichter als Prozesse und Sie können mehrere Threads innerhalb eines Prozesses haben. Ein neueres Betriebssystem als Linux 2.4 hat auch die minimal planbare Einheit von Prozess zu Thread geändert. Der Prozess existiert nur als Container für Threads und spielt jetzt die Rolle der Ressourcenverwaltung. Threads auf Betriebssystemebene werden auf jeden Kern der CPU verteilt und können gleichzeitig ausgeführt werden.

import socket
import time
from concurrent.futures import ThreadPoolExecutor


def blocking_way():
    sock = socket.socket()
    # blocking
    sock.connect(('example.com', 80))
    request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        # blocking
        chunk = sock.recv(4096)
    return response


def multi_thread_way():
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(blocking_way) for i in range(10)}
    return len([future.result() for future in futures])


if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        start = time.time()
        multi_thread_way()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

Ausführungsergebnis:

elapsed_time: 0.31[sec]
elapsed_time: 0.32[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.31[sec]
elapsed_time: 0.31[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.31[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.27[sec]
mean_elapsed_time: 0.30[sec]

Die durchschnittliche Zeit für 10 Mal beträgt 0,30 Sekunden. Wie erwartet war es schneller als Multi-Prozess. Multithreading scheint das Problem der langsamen Multiprozess-Prozessumschaltung zu lösen, und der Umfang der Aufgaben, die gleichzeitig verarbeitet werden können, hat sich von Hunderten auf Tausende von Multiprozessen erhöht.

Es gibt jedoch auch ein Problem mit Multithreading. Erstens ist Pythons Multithreading ["GIL"](https://ja.wikipedia.org/wiki/%E3%82%B0%E3%83%AD%E3%83%BC%E3%83%90% E3% 83% AB% E3% 82% A4% E3% 83% B3% E3% 82% BF% E3% 83% 97% E3% 83% AA% E3% 82% BF% E3% 83% AD% E3% Bei Vorhandensein von 83% 83% E3% 82% AF) besteht das Problem, dass der Vorteil einer Mehrkern-CPU nicht genutzt werden kann. Innerhalb eines Python-Prozesses darf jeweils nur ein Thread aktiv sein. Warum war Multithreading schneller als Multiprozess?

Der Grund dafür ist, dass beim Aufrufen eines blockierenden Systemaufrufs wie "sock.connect ()", "sock.recv ()" der aktuelle Thread die GIL freigibt und anderen Threads die Möglichkeit gibt, sie auszuführen. Wenn Sie sich jedoch in einem einzelnen Thread befinden, wird der blockierende Systemaufruf unverändert blockiert.

Bohnenwissen: Pythons time.sleep ist ein Blockierungsprozess, aber in der Multithread-Programmierung blockierttime.sleep ()keine anderen Threads.

Neben GIL gibt es einige häufige Probleme mit Multithreading. Der Thread ist für das Betriebssystem geplant und seine "Planungsstrategie" lautet ["Preemption"](https://ja.wikipedia.org/wiki/%E3%83%97%E3%83%AA%E3 % 82% A8% E3% 83% B3% E3% 83% 97% E3% 82% B7% E3% 83% A7% E3% 83% B3), Threads gleicher Priorität laufen mit gleichen Chancen Ist garantiert zu sein. Preemption ist eine First-Come-First-Served-Strategie. Daher ist nicht vorhersehbar, welcher Thread ausgeführt wird und welcher Code beim nächsten Mal ausgeführt wird, und "Wettbewerbsstatus". Es kann Wiki sein /% E7% AB% B6% E5% 90% 88% E7% 8A% B6% E6% 85% 8B).

Wenn beispielsweise ein Scraping-Worker-Thread die nächste URL abfragt, die aus der Task-Warteschlange entfernt werden soll, stellt sich die Frage, welche übergeben werden soll, wenn mehrere Threads gleichzeitig abrufen. Daher sind Sperren und Synchronisationswarteschlangen erforderlich, um zu verhindern, dass dieselbe Aufgabe mehrmals ausgeführt wird.

Multithreading kann auch Hunderte bis Tausende von Multitasking gleichzeitig verarbeiten, ist jedoch für große und häufige Websysteme immer noch nicht ausreichend. Das größte Problem beim Multithreading ist natürlich immer noch der Konflikt.

3-4 Asynchrone nicht blockierende Methode Schließlich haben wir die nicht blockierende Methode erreicht. Lassen Sie uns zunächst sehen, wie das primitivste Nicht-Blockieren funktioniert.

import socket
import time


def nonblocking_way():
    sock = socket.socket()
    sock.setblocking(False)
    #Weil Socket beim Senden einer nicht blockierenden Verbindungsanforderung einen Fehler sendet
    try:
        sock.connect(('example.com', 80))
    except BlockingIOError:
        pass
    request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
    data = request.encode('ascii')
    #Wiederholen Sie das Senden, da der Socket nicht vorhersagen kann, wann eine Verbindung hergestellt wird
    while 1:
        try:
            sock.send(data)
            break
        except OSError:
            pass

    response = b''
    #Wiederholen Sie den Empfang, da es nicht vorhersehbar ist, wann die Antwort gelesen werden kann
    while 1:
        try:
            chunk = sock.recv(4096)
            while chunk:
                response += chunk
                # blocking
                chunk = sock.recv(4096)
            break
        except OSError:
            pass
    return response


def sync_way():
    res = []
    for i in range(10):
        res.append(nonblocking_way())
    return len(res)


if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        start = time.time()
        sync_way()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

Ausführungsergebnis:

elapsed_time: 2.71[sec]
elapsed_time: 2.82[sec]
elapsed_time: 2.73[sec]
elapsed_time: 2.69[sec]
elapsed_time: 2.82[sec]
elapsed_time: 2.72[sec]
elapsed_time: 2.51[sec]
elapsed_time: 2.65[sec]
elapsed_time: 2.75[sec]
elapsed_time: 3.50[sec]
mean_elapsed_time: 2.79[sec]

Die durchschnittliche Zeit für 10 Mal beträgt 2,79 Sekunden. Ich fühle mich getäuscht. Die Berechnungszeit entspricht der synchronen Blockierung, der Code ist jedoch komplizierter. Sie denken vielleicht, dass Sie keine Nichtblockierung benötigen.

Zunächst wies sock.setblocking (False) im obigen Code das Betriebssystem an, den Aufruf des blockierenden Systems von socket in non-blocking zu ändern. Wie bereits erwähnt, bedeutet das Nicht-Blockieren, wenn Sie eine Sache tun, nicht, dass das Programm, das Sie aufgerufen hat, die andere tut. Der obige Code wird nicht blockiert, nachdem "sock.connect ()" und "sock.recv ()" ausgeführt wurden.

Und der Code ist kompliziert, weil er nicht mehr blockiert. Wenn connect () aufgerufen wird, gibt das Betriebssystem zuerst einen Fehler aus, daher müssen Sie ihn hier mit try abfangen. Und hier, ohne zu blockieren, fahren wir sofort mit dem folgenden Code fort.

Ich wiederhole die while-Anweisung und führe send () aus, weil connect () nicht mehr blockiert und ich nicht weiß, wann die Verbindung hergestellt wird, also muss ich es weiter versuchen. Und selbst wenn "send ()" ausgeführt wird, wird der Aufruf von "recv ()" wiederholt ausgeführt, da nicht bekannt ist, wann die Antwort kommt.

connect () und recv () blockieren das Hauptprogramm nicht mehr, aber die freie CPU-Zeit wird nicht effektiv genutzt. Ich habe diese Zeit damit verbracht, das Lesen und Schreiben von "Socket" zu wiederholen und Fehler in der "while" -Schleife zu behandeln.

Und da noch 10 Downloads der Reihe nach ausgeführt werden, entspricht die Gesamtberechnungszeit der synchronen Blockierungsmethode.

3-5. Verbesserung der asynchronen nicht blockierenden Methode

3-5-1. epoll Wenn die Betriebssystemseite prüft, ob jeder nicht blockierende Anruf bereit ist, muss die Anwendungsseite nicht warten oder in einer Schleife urteilen, indem sie die freie Zeit einer anderen Verarbeitung zuweist Sie können die Effizienz verbessern.

Daher hat das Betriebssystem die Änderung des E / A-Status als Ereignis gekapselt. Zum Beispiel lesbare Ereignisse, beschreibbare Ereignisse und so weiter. Außerdem wurde ein Systemmodul bereitgestellt, mit dem die Anwendung Ereignisbenachrichtigungen empfangen kann. Dieses Modul ist "auswählen". Über select die Anwendung [" File Descriptor "](https://ja.wikipedia.org/wiki/%E3%83%95%E3%82%A1%E3%82%A4%E3%83% AB% E8% A8% 98% E8% BF% B0% E5% AD% 90) und Rückruffunktionen können registriert werden. Wenn sich der Status des Dateideskriptors ändert, ruft select die vorregistrierte Rückruffunktion auf. Diese Methode wird als E / A-Multiplexing bezeichnet.

select wurde später aufgrund des ineffizienten Algorithmus als poll verbessert. Außerdem wurde der BSD-Kernel auf das Modul "kqueue" und der Linux-Kernel auf das Modul "epoll" aktualisiert. Die Funktionalität dieser vier Module ist dieselbe und die verfügbaren APIs sind nahezu identisch. Der Unterschied zwischen "kqueue" und "epoll" besteht darin, dass sie bei der Verarbeitung einer großen Anzahl von Dateideskriptoren effizienter sind als die beiden anderen Module.

Sie hören oft das "epoll" -Modul, weil Linux-Server weit verbreitet sind. Unter der Annahme, dass die Anzahl der Dateideskriptoren $ N $ beträgt, kann select ・ poll mit dem Zeitberechnungsbetrag von $ O (N) $ verarbeitet werden, während epoll mit $ O (1) $ verarbeitet werden kann. Außerdem stellt epoll alle Listening-Dateideskriptoren mit einem speziellen Dateideskriptor bereit. Dieser Dateideskriptor kann von Threads mit mehreren Druckvorgängen gemeinsam genutzt werden.

3-5-2. Rückruf

Ich habe das Abhören von E / A-Ereignissen dem Betriebssystem überlassen. Was sollte das Betriebssystem als Nächstes tun, wenn sich der E / A-Status ändert (z. B. wenn eine Socket-Verbindung hergestellt wird und Daten gesendet werden können)? Es ist ein ** Rückruf **.

Hier müssen wir das Senden und Lesen von Daten in separaten Funktionen zusammenfassen. Wenn "epoll" im Namen der Anwendung den "Socket" -Status abhört, rufen Sie die HTTP-Anforderungssendefunktion auf, wenn der "Socket" -Status für "epoll" beschreibbar wird (Verbindung hergestellt). Wenn der Socket-Status lesbar wird (der Client empfängt die Antwort), rufen Sie die Antwortverarbeitungsfunktion auf. "

Verwenden Sie "epoll" und Rückrufe, um den Scraping-Code umzugestalten.

import socket
import time
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ


class Crawler:
    def __init__(self, path):
        self.path = path
        self.sock = None
        self.response = b''

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('example.com', 80))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)

    def connected(self, key, mask):
        selector.unregister(key.fd)
        get = f'GET {self.path} HTTP/1.0\r\nHost: example.com\r\n\r\n'
        self.sock.send(get.encode('ascii'))
        selector.register(key.fd, EVENT_READ, self.read_response)

    def read_response(self, key, mask):
        global stopped
        chunk = self.sock.recv(4096)
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)
            paths_todo.remove(self.path)
            if not paths_todo:
                stopped = True

Der Unterschied besteht darin, dass Sie 10 verschiedene Seiten anstelle derselben Seite herunterladen. Der Satz von Seitenpfaden zum Herunterladen von "path_todo" wird später definiert. Werfen wir einen Blick auf die Verbesserungen hier.

Zuerst verschwanden die beiden Schleifen "send ()" und "recv ()".

Dann haben wir das Selektor-Modul eingeführt und eine DefaultSelector-Instanz erstellt. Die Python-Standardbibliothek selectors ist eine Kapselung von select / poll / epoll / kqueue. DefaultSelector wählt je nach Betriebssystem das beste Modul aus. Alle Versionen von Linux 2.5.44 und höher sind "epoll".

Ich habe auch eine Rückruffunktion registriert, die behandelt werden soll, nachdem das beschreibbare Ereignis "EVENT_WRITE" und das lesbare Ereignis "EVENT_READ" von "Socket" aufgetreten sind.

Die Struktur des Codes wurde bereinigt und blockierende Benachrichtigungen werden dem Betriebssystem überlassen. Zum Herunterladen von 10 verschiedenen Seiten benötigen Sie jedoch 10 Crawler-Instanzen, und es treten 20 Ereignisse auf. Wie würden Sie das Ereignis, das gerade aufgetreten ist, von selector erhalten und den entsprechenden Rückruf ausführen?

3-5-3. Ereignisschleife

Die einzige Möglichkeit, das oben genannte Problem zu lösen, besteht darin, die alte Methode anzuwenden. Mit anderen Worten, es ist eine Schleife. Greifen Sie auf das Auswahlmodul zu und warten Sie, bis Sie wissen, welches Ereignis aufgetreten ist und welcher Rückruf aufgerufen werden soll. Diese Benachrichtigungsschleife für wartende Ereignisse wird als Ereignisschleife bezeichnet (https://ja.wikipedia.org/wiki/%E3%82%A4%E3%83%99%E3%83%B3%E3%83]. % 88% E3% 83% AB% E3% 83% BC% E3% 83% 97 #: ~: Text =% E3% 82% A4% E3% 83% 99% E3% 83% B3% E3% 83% 88 % E3% 83% AB% E3% 83% BC% E3% 83% 97% 20 (Ereignis% 20loop)% E3% 80% 81,% E3% 80% 81% E3% 81% 9D% E3% 82% 8C % E3% 82% 89% E3% 82% 92% E3% 83% 87% E3% 82% A3% E3% 82% B9% E3% 83% 91% E3% 83% 83% E3% 83% 81% EF % BC% 88% E9% 85% 8D% E9% 80% 81% EF% BC% 89).

def loop():
    while not stopped:
        #Blockieren, bis ein Ereignis eintritt
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback(event_key, event_mask)

Im obigen Code steuert eine globale Variable namens "gestoppt", wann die Ereignisschleife stoppt. Wenn alle path_todos verbraucht sind, ändern Sie stop in True.

Und selector.select () ist ein blockierender Aufruf. Wenn das Ereignis hier nicht auftritt, hat die Anwendung nichts zu behandeln und muss blockiert werden, bis das Ereignis eintritt. Wie zu erwarten ist, kann beim Herunterladen nur einer Webseite auf "connect ()" "send ()" und "recv ()" folgen, sodass die Synchronisationsblockierungsmethode und die Verarbeitungseffizienz gleich sind. .. Der Grund ist, dass selbst wenn Sie nicht mit "connect ()" oder "recv ()" blockieren, es mit "select ()" blockiert wird.

Daher wurde die "Selector" -Funktion (im Folgenden als "epoll / kqueue" bezeichnet) entwickelt, um den parallelen Zugriff in großem Maßstab aufzulösen. Die Selektorfunktion ist am besten, wenn das System eine große Anzahl nicht blockierender Aufrufe enthält und Ereignisse fast zufällig generiert werden können.

Der folgende Code hat 10 Download-Aufgaben erstellt und eine Ereignisschleife gestartet.

if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        selector = DefaultSelector()
        stopped = False
        paths_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
        start = time.time()
        for path in paths_todo:
            crawler = Crawler(path)
            crawler.fetch()
        loop()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

Ausführungsergebnis:

elapsed_time: 0.29[sec]
elapsed_time: 0.27[sec]
elapsed_time: 0.24[sec]
elapsed_time: 0.27[sec]
elapsed_time: 0.29[sec]
elapsed_time: 0.29[sec]
elapsed_time: 0.32[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.34[sec]
mean_elapsed_time: 0.29[sec]

Die durchschnittliche Zeit für 10 Mal beträgt 0,29 Sekunden. du bist stark. Das Problem des gleichzeitigen Herunterladens von 10 Seiten mit "Ereignisschleife + Rückruf" in einem einzelnen Thread wurde behoben. Dies ist eine asynchrone Programmierung. Es gibt eine "for" -Schleife, es sieht so aus, als würde sie nacheinander "Crawler" -Instanzen erstellen und die "fetch" -Methode aufrufen, aber die "fetch" -Methode behandelt nur "connect ()" und die Ereignisregistrierung. ist. In Bezug auf die Ausführungszeit wurden die Download-Aufgaben eindeutig gleichzeitig ausgeführt.

Asynchrone Verarbeitungsprozedur des obigen Codes:

  1. Erstellen Sie eine Crawler-Instanz
  2. Rufen Sie die Methode "fetch" auf, um eine "Socket" -Verbindung herzustellen und ein beschreibbares Ereignis im "Selector" zu registrieren
  3. fetch hat keinen Blockierungsprozess, kehren Sie also sofort zurück
  4. Wiederholen Sie die obigen Schritte zehnmal, um alle 10 Download-Aufgaben zur Ereignisschleife hinzuzufügen
  5. Rufen Sie eine Ereignisschleife auf, geben Sie die erste Schleife ein und blockieren Sie, um auf Ereignisse zu warten
  6. Wenn eine Download-Task "EVENT_WRITE" auftritt, rufen Sie die "verbunden" -Methode zurück und beenden Sie die erste Schleife.
  7. Geben Sie die zweite Rundschleife ein und führen Sie die Rückruffunktion aus, wenn ein Ereignis in einer Download-Aufgabe auftritt. An diesem Punkt ist nicht vorhersehbar, welches Ereignis eintreten wird und im vorherigen "verbunden" "EVENT_READ" kann auftreten, und andere Aufgaben "EVENT_WRITE" können auftreten (zu diesem Zeitpunkt wird die von einer Aufgabe blockierte Zeit zum Ausführen anderer Aufgaben verwendet).
  8. Die Schleife wird fortgesetzt, bis alle Download-Aufgaben ausgeführt wurden
  9. Verlassen Sie die Schleife und beenden Sie das gesamte Programm

3-5-4. Zusammenfassung

Wir haben von der synchronen Blockierungsmethode zur asynchronen nicht blockierenden Methode gesehen. Und jetzt können Sie mit schwarzer Magie mehrere Netzwerk-E / A-Blockierungsaufgaben in einem einzigen Thread parallel verarbeiten. Im Vergleich zu Multithreading werden nicht einmal Threads gewechselt. Die Rückrufausführung ist ein Funktionsaufruf, der innerhalb des Thread-Stapels abgeschlossen wird. Darüber hinaus ist die Leistung auch hervorragend. Die Anzahl der Aufgaben, die gleichzeitig auf einem einzelnen Server verarbeitet werden können, ist auf Zehntausende bis Hunderttausende gestiegen.

Dies ist das Ende der Unterstützung für die asynchrone Programmierung einiger Programmiersprachen. Ingenieure müssen viel Zeit damit verbringen, "epoll" direkt zu verwenden, um Ereignisse und Rückrufe zu registrieren, Ereignisschleifen zu verwalten und Rückrufe zu entwerfen.

Wie Sie dem bisherigen Inhalt entnehmen können, können Sie bei asynchroner Programmierung unabhängig von der verwendeten Sprache dem obigen Muster "Ereignisschleife + Rückruf" nicht entkommen. Möglicherweise verwenden Sie jedoch nicht "epoll", und es handelt sich möglicherweise nicht um eine "while" -Schleife. Alle von ihnen sind jedoch asynchrone Methoden des Modells "Ich werde Sie später unterrichten".

Warum sehen einige asynchrone Programme keine Rückrufmuster? Wir werden uns das in Zukunft ansehen. Sie haben nicht über Corroutine gesprochen, die Exekutive von Pythons asynchroner Programmierung.

4. Möglichkeiten zur Verbesserung der asynchronen E / A von Python

Von hier aus werde ich erklären, wie das asynchrone Programmier-Ökosystem von Python das oben erwähnte Muster "Ereignisschleife + Rückruf" geerbt hat, und dann sehen, wie es sich zu einem nativen Collout-Muster namens "Asyncio" entwickelt hat. Machen wir das.

4-1. Rückruf Hölle

In [3. Asynchroner E / A-Entwicklungspfad](# 3-Asynchroner io-Entwicklungspfad) haben wir die Grundstruktur von "Ereignisschleife + Rückruf" gesehen, die asynchrone Programmierung mit einem einzelnen Thread realisiert. Sicherlich kann "Ereignisschleife + Rückruf" die Effizienz des Programms erheblich verbessern. Das Problem wurde jedoch noch nicht gelöst. Das eigentliche Projekt kann sehr komplex sein, daher müssen Sie die folgenden Punkte berücksichtigen:

In einem realen Projekt sind diese Probleme unvermeidlich. Und hinter dem Problem stehen die Mängel des Rückrufmusters.

def callback_1():
    #wird bearbeitet
    def callback_2():
        #wird bearbeitet
        def callback_3():
            #wird bearbeitet
            def callback_4():
                #wird bearbeitet
                def callback_5():
                    #wird bearbeitet
                async_function(callback_5)
            async_function(callback_4)
        async_function(callback_3)
    async_function(callback_2)
async_function(callback_1)

(Bitte vergib Lisp-Anhängern)

Beim Schreiben von synchronem Programmcode wird die entsprechende Verarbeitung normalerweise von oben nach unten ausgeführt.

do_a()
do_b()

Wenn "do_b ()" vom Ergebnis von "do_a ()" abhängt und "do_a ()" ein asynchroner Aufruf ist, wissen wir nicht, wann das Ergebnis von "do_a ()" zurückgegeben wird. Die nachfolgende Verarbeitung muss in Form eines Rückrufs an "do_a ()" übergeben werden, der garantiert, dass "do_a ()" abgeschlossen ist, bevor "do_b ()" ausgeführt wird.

do_a(do_b())

Und wenn alle langen Verarbeitungsabläufe asynchron gemacht werden, wird es so sein.

do_a(do_b(do_c(do_d(do_e(do_f(...))))))

Der obige Stil heißt "Callback Hell style". Das Hauptproblem ist jedoch nicht das Aussehen, sondern die Notwendigkeit, die ursprüngliche Struktur von oben nach unten von außen nach innen zu ändern. Zuerst "do_a ()", dann "do_b ()", dann "do_c ()", ... und dann zum innersten "do_f ()". Im synchronen Prozess bedeutet "do_b ()" nach "do_a ()", dass der Befehlszeiger des Threads den Fluss steuert. Wenn es jedoch um Rückrufmuster geht, werden die Flusskontrollen von den Ingenieuren sorgfältig platziert.

Das sock-Objekt in [3-1 Synchronous Version](# 3-1-Synchronous Blocking Method) wird von oben nach unten wiederverwendet, während [3-5 Callback Version](# 3-5-Asynchrone Verbesserung der nicht blockierenden Methode) erfordert, dass Sie die "Crawler" -Klasse instanziieren und das "sock" -Objekt in "self" speichern. Ohne einen objektorientierten Programmierstil müssten Sie den Status, den Sie teilen müssen, an jede Rückruffunktion übergeben, wie z. B. eine Schlagstockberührung. Sie müssen vorausplanen und sorgfältig festlegen, welche Status von mehreren asynchronen Aufrufen gemeinsam genutzt werden sollen.

Eine Reihe von Rückrufen stellt eine Kette von Aufrufen dar („Methodenkette“). Zum Beispiel gibt es eine Kette von "do_a ()" bis "do_f ()". Was denkst du, wenn do_d () einen Fehler verursacht? Die Kette ist gebrochen und Sie verlieren den Zustand, den Stab zu berühren. Und ["Stack Trace"](https://ja.wikipedia.org/wiki/%E3%82%B9%E3%82%BF%E3%83%83%E3%82%AF%E3%83% 88% E3% 83% AC% E3% 83% BC% E3% 82% B9) werden zerstört. Zum Beispiel verursacht "do_d ()" einen Fehler, und der Aufruf von "do_d ()" schlägt in "do_c ()" fehl, so dass "do_c ()" selbst auch einen Fehler verursacht. In ähnlicher Weise verursachen sowohl "do_b ()" als auch "do_a ()" einen Fehler, und das Fehlerprotokoll meldet nur, dass "der Aufruf von" do_a () "einen Fehler verursacht hat". Es war jedoch "do_d ()", das den Fehler tatsächlich verursachte. Um dies zu verhindern, müssen Sie alle Fehler abfangen und die Daten als Rückgabewert der Funktion zurückgeben. Alle Rückruffunktionen müssen den Rückgabewert der vorherigen Funktion überprüfen, wodurch "Hiding Error" verhindert wird.

Während die Lesbarkeit nur eine Frage des Erscheinungsbilds ist, tragen zwei Nachteile, wie das Aufbrechen von Stapelspuren und das Erschweren des Teilens und Verwaltens des Status, zu der außerordentlichen Schwierigkeit der auf Rückrufen basierenden asynchronen Programmierung bei. Jede Programmiersprache versucht, dieses Problem zu lösen. Dank dessen "Promise", "Coroutine" Eine Lösung wie / wiki /% E3% 82% B3% E3% 83% AB% E3% 83% BC% E3% 83% 81% E3% 83% B3) wurde geboren.

4-2. Herausforderungen

Das Muster "Ereignisschleife + Rückruf" löste die Schwierigkeiten der asynchronen Programmierung wie "Wenn die asynchrone Aufgabe abgeschlossen ist" und "Wie wird der Rückgabewert des asynchronen Aufrufs behandelt?". Rückrufe erschweren jedoch das Programm. Bevor wir über Möglichkeiten nachdenken können, um diesen Mangel zu vermeiden, müssen wir zunächst das Wesentliche klären. Warum ist ein Rückruf obligatorisch? Und was ist der Grund für das Teilen und Verwalten des Staates, was überhaupt einer der Nachteile ist?

Staatsteilung und -verwaltung sind notwendig, da das Programm wissen muss, was es getan hat, was es tut und was es tun wird. Mit anderen Worten, das Programm muss seinen aktuellen Status kennen und jeden Rückruf mit einem Schlagstock berühren und an Ort und Stelle halten.

Das Verwalten des Status zwischen mehreren Rückrufen ist schwierig. Warum also nicht jeden Rückruf nur seinen eigenen Status verwalten lassen? Die Anrufkette erschwert die Fehlerbehandlung. Warum also nicht eine Anrufkette verwenden? Wenn Sie jedoch keine Aufrufkette verwenden, woher weiß die aufgerufene Funktion, ob die vorherige Funktion abgeschlossen ist? Wie wäre es also, wenn ein Rückruf den nächsten Rückruf benachrichtigt? Erstens kann ein Rückruf als ausstehende Aufgabe angesehen werden.

Die gegenseitige Benachrichtigung zwischen Aufgaben, die jeder Aufgabe ihren eigenen Status verleiht, ist genau die alte Programmierpraxis "Cooperative Multitasking". Sie müssen es jedoch in einem einzigen Thread planen. Von hier aus können Sie einen Stapelrahmen haben und Ihren Status ["Coroutine"] leicht erkennen (https://ja.wikipedia.org/wiki/%E3%82%B3%E3%83%AB%) E3% 83% BC% E3% 83% 81% E3% 83% B3 #: ~: Text =% E3% 82% B3% E3% 83% AB% E3% 83% BC% E3% 83% 81% E3% 83% B3% EF% BC% 88% E8% 8B% B1% 3A% 20co% 2Droutine,% E5% 8B% 95% E4% BD% 9C% E3% 82% 92% E8% A1% 8C% E3% 81 % 86% E3% 81% 93% E3% 81% A8% E3% 81% AB% E3% 82% 88% E3% 82% 8B% E3% 80% 82) kommt ins Spiel. Natürlich ist es auch möglich, sich zwischen Koruchin gegenseitig zu benachrichtigen.

4-3. Corroutine

Corroutine ist ["Ersatz"](https://ja.wikipedia.org/wiki/%E3%82%B5%E3%83%96%E3%83%AB%E3%83%BC%E3%83%81 Es ist eine Verallgemeinerung von% E3% 83% B3). Die Coroutine-Planungsstrategie lautet ["Nicht präemptiv"](http://e-words.jp/w/%E3%83%8E%E3%83%B3%E3%83%97%E3%83%AA%E3 % 82% A8% E3% 83% B3% E3% 83% 97% E3% 83% 86% E3% 82% A3% E3% 83% 96% E3% 83% 9E% E3% 83% AB% E3% 83 % 81% E3% 82% BF% E3% 82% B9% E3% 82% AF.html #: ~: text =% E3% 83% 8E% E3% 83% B3% E3% 83% 97% E3% 83 % AA% E3% 82% A8% E3% 83% B3% E3% 83% 97% E3% 83% 86% E3% 82% A3% E3% 83% 96% E3% 83% 9E% E3% 83% AB % E3% 83% 81% E3% 82% BF% E3% 82% B9% E3% 82% AF% E3% 81% A8% E3% 81% AF% E3% 80% 81% E4% B8% 80% E3 % 81% A4% E3% 81% AE% E5% 87% A6% E7% 90% 86% E8% A3% 85% E7% BD% AE, CPU% E3% 82% 92% E7% AE% A1% E7 % 90% 86% E3% 81% 97% E3% 81% AA% E3% 81% 84% E6% 96% B9% E5% BC% 8F% E3% 80% 82), mit mehreren Einträgen Sie können das Anhalten und Fortsetzen steuern.

Eine Unterroutine ist ein aufrufbarer Codeblock, der von einer Programmiersprache definiert wird. Mit anderen Worten, es handelt sich um eine Reihe von Anweisungen, die zum Erreichen einer Funktion gepackt sind. In allgemeinen Programmiersprachen werden Unterprogramme durch Strukturen wie Funktionen und Methoden realisiert.

4-4 Generatorbasiertes Collout

Spezielles Objekt in Python ["Generator"](https://ja.wikipedia.org/wiki/%E3%82%B8%E3%82%A7%E3%83%8D%E3%83%AC% E3% 83% BC% E3% 82% BF_ (% E3% 83% 97% E3% 83% AD% E3% 82% B0% E3% 83% A9% E3% 83% 9F% E3% 83% B3% E3 Es gibt% 82% B0)). Die Eigenschaften des Generators ähneln denen des Coroutums, er kann während einer Iteration unterbrochen werden und verliert seinen vorherigen Zustand erst bei der nächsten Iteration.

Der Generator wurde in Python 2.5 (PEP 342) erweitert, um einfache Kollouts im Generator zu ermöglichen. Der vorgeschlagene Titel für diese Erweiterung lautet "Coroutinen über erweiterte Generatoren". Dank PEP 342 kann der Generator jetzt die Ausführung bei "Yield" unterbrechen und Daten zurückgeben. Sie können auch "Senden" verwenden, um Daten an den Generator zu senden, und "Werfen", um einen Fehler im Generator zu verursachen und ihn zu beenden.

Überarbeiten Sie dann das Scraping-Programm mit einem generatorbasierten Collout.

4-4-1. Zukünftiges Objekt

Woher kennen Sie das Ergebnis eines asynchronen Aufrufs, wenn Sie die Rückrufmethode beenden? Hier definieren wir zuerst das Objekt. Wenn das Ergebnis des asynchronen Aufrufs zurückgegeben wird, speichern Sie es darin. Dieses Objekt wird als "Future" -Objekt bezeichnet.

import socket
import time
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ


class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

Das "Future" -Objekt hat eine "result" -Instanzvariable und speichert zukünftige Ausführungsergebnisse. Und die Methode "set_result" setzt das "Ergebnis" und führt nach dem Binden des Werts an das "Ergebnis" die Rückruffunktion aus, die dem "Future" -Objekt vorab hinzugefügt wurde. Rückruffunktionen können mit der Methode add_done_callback () hinzugefügt werden.

Haben Sie nicht gesagt, dass Sie die Rückrufmethode beenden? Keine Panik. Wie in [Hier](# 3-5-4-Zusammenfassung) erläutert, können Sie bei asynchroner Programmierung nicht aus dem Muster "Ereignisschleife + Rückruf" entkommen. Und der Rückruf hier unterscheidet sich ein wenig vom vorherigen.

4-4-2. Refactoring Crawler

Wie auch immer, ich habe ein "Future" -Objekt erstellt, das zukünftige Daten darstellt. Lassen Sie uns den Scraping-Code mit dem Objekt "Future" umgestalten.

class Crawler:
    def __init__(self, path):
        self.path = path
        self.response = b''

    def fetch(self):
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect(('example.com', 80))
        except BlockingIOError:
            pass
        f = Future()

        def on_connected():
            f.set_result(None)

        # fileno()Die Methode gibt den Socket-Dateideskriptor als kurzen Integer-Typ zurück
        selector.register(sock.fileno(), EVENT_WRITE, on_connected)
        yield f
        selector.unregister(sock.fileno())
        get = f'GET {self.path} HTTP/1.0\r\nHost: example.com\r\n\r\n'
        sock.send(get.encode('ascii'))

        global stopped
        while True:
            f = Future()

            def on_readable():
                f.set_result(sock.recv(4096))

            selector.register(sock.fileno(), EVENT_READ, on_readable)
            #Erhalten Sie das gesendete Ergebnis
            chunk = yield f
            selector.unregister(sock.fileno())
            if chunk:
                self.response += chunk
            else:
                paths_todo.remove(self.path)
                if not paths_todo:
                    stopped = True
                break

Im Vergleich zur vorherigen Callback-Version macht es einen großen Unterschied. Ich habe den Ausdruck "Yield" für die Methode "Fetch" verwendet und daraus einen Generator gemacht. Der Generator kann einmal mit "next ()" oder mit "send (None)" gestartet werden und wird unterbrochen, wenn er "ield "erreicht. Wie startet der "Fetch" -Generator neu?

4-4-3. Aufgabenobjekt

Sie müssen eine Regel befolgen, um das oben genannte Problem zu lösen. Es ist das "Prinzip der Einzelverantwortung" (https://en.wikipedia.org/wiki/Single-responsibility_principle). Daher werden wir hier etwas erstellen, das die Rolle des Neustarts des Generators und der Verwaltung seines Status spielt. Nennen Sie es "Aufgabe".

class Task:
    def __init__(self, coro):
        #Corroutine-Objekt
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            #Beim Senden läuft der Generator bis zur nächsten Ausbeute
            # next_Zukunft ist das Objekt, das durch Ausbeute zurückgegeben wird
            next_future = self.coro.send(future.result)
        except StopIteration:
            return
        next_future.add_done_callback(self.step)

Das "Task" -Objekt im obigen Code packt ein "Coro" -Koroutinenobjekt. Da es sich bei der zu verwaltenden Aufgabe um eine ausstehende Aufgabe (Coroutine) handelt, wird "coro" hier zu einem "Abruf" -Generator. Und es gibt eine "Schritt" -Methode, die bei der Initialisierung einmal ausgeführt wird. Die "step" -Methode ruft die "send ()" -Methode des Generators auf und wird bei der Initialisierung zu "send (None)", sodass die anfängliche Iteration von "coro", dh "fetch ()", ausgeführt wird.

Nachdem "send ()" abgeschlossen ist, erhalten Sie die nächste "Zukunft" und verwenden "add_done_callback", um der nächsten "Zukunft" eine Rückruffunktion namens "step ()" hinzuzufügen.

Schauen wir uns als nächstes den Generator fetch () an. Geschäftslogik wie das Senden einer Anfrage und das Empfangen einer Antwort wird intern abgeschlossen. Und die in selector registrierte Rückruffunktion ist ebenfalls einfach geworden. Die beiden "Renditen" geben die entsprechenden "Zukunfts" zurück und empfangen sie in "Task.step ()". Sie haben jetzt erfolgreich "Task", "Future" und "Coroutine" verbunden.

Initialisiert das "Task" -Objekt und "fetch ()" läuft bis zum ersten "Yield". Wie bekommst du es zurück?

4-4-4. Ereignisschleife

Die Ereignisschleife ist zurück. Wenn Sie die erste Ausbeute erreicht haben, warten Sie, bis das registrierte "EVENT_WRITE" auftritt. Die Ereignisschleife bewegt sich wie ein Herzschlag weiter, sobald sie zu pulsieren beginnt.

def loop():
    while not stopped:
        #Blockieren, bis ein Ereignis eintritt
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()


if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        selector = DefaultSelector()
        stopped = False
        paths_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
        start = time.time()
        for path in paths_todo:
            crawler = Crawler(path)
            Task(crawler.fetch())
        loop()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

Ausführungsergebnis:

elapsed_time: 0.30[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.29[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.26[sec]
mean_elapsed_time: 0.27[sec]

Die durchschnittliche Zeit für 10 Mal beträgt 0,27 Sekunden. Dieses Mal unterscheidet sich loop ein wenig von vorher. callback () empfängt nicht mehr event_key und event_mask. Mit anderen Worten, der Rückruf hier muss nicht wissen, wer das Ereignis ausgelöst hat. Er kann in Kombination mit fetch () angezeigt werden, aber der Rückruf setzt den Wert nur mitset_result ()auf future. Sie müssen es nur installieren. Und Sie müssen nicht wissen, um welche "Zukunft" es sich handelt, das Collout kann seinen eigenen Zustand retten, und es ist in Ordnung, wenn Sie Ihre "Zukunft" kennen. Sie müssen sich keine Gedanken über das Festlegen eines Werts machen, das Collout erledigt alles für Sie.

4-4-5. Zusammenfassung von "Generator Collout Style VS Callback Style"

Rückrufstil:

Generator-Collout-Stil:

4-4-6. Den Code weiter überarbeiten

Der Code ist etwas schwer zu lesen. Was soll ich hier tun, wenn ich aufgefordert werde, die Fehlerresistenz und Funktionalität von "Abrufen" zu verbessern? Außerdem sind technische Logik ("Socket" -bezogen) und Geschäftslogik (Anforderungs- und Antwortverarbeitung) gemischt, was nicht gut ist.

An diesen Stellen gibt es jedoch einen "Ertrag", und wenn Sie ihn abstrahieren möchten, müssen Sie ihn zu einem Generator machen. Außerdem ist fetch () selbst ein Generator, und das Spielen mit dem Generator im Generator kann den Code noch komplizierter machen.

Die Python-Designer bemerkten dieses Problem ebenfalls und stellten ein Spielzeug zur Verfügung, um mit dem Generator in einem Generator namens "Yield from" zu spielen.

4-5. Verbessertes generatorbasiertes Collout mit Ausbeute von

4-5-1. Ausbeute aus Einführung der Grammatik

yield from ist eine Syntax, die aus Python 3.3 (PEP 380) eingeführt wurde. Der PEP 380 soll in erster Linie die Unannehmlichkeiten beim Spielen mit dem Generator im Generator beseitigen und hat zwei Funktionen.

Eine davon ist, dass Sie den Subgenerator nicht durch Drehen der Iteration "nachgeben" müssen, sondern direkt aus "nachgeben" können. Die folgenden zwei Arten von Generatoren haben eine äquivalente Funktion.

def gen_1():
    sub_gen = range(10)
    yield from sub_gen


def gen_2():
    subgen = range(10)
    for item in subgen:
        yield item

Das andere ist die Fähigkeit, Kommunikationskanäle zwischen dem Untergenerator und dem Hauptgenerator zu öffnen.

def gen():
    yield from sub_gen()


def sub_gen():
    while 1:
        x = yield
        yield x + 1


def main():
    g = gen()
    next(g)  #Bis zur ersten Ausbeute laufen lassen
    retval = g.send(1)  # gen()Scheint Daten zu senden, aber tatsächlich sub_gen()Senden an
    print(retval)  # sub_gen()Gibt die berechneten 2 aus aus
    g.throw(StopIteration)  # sub_gen()Verursachen Sie einen Fehler bei

Der obige Code zeigt die gegenseitige Kommunikationsfunktion von "Ausbeute von". yield from öffnet einen Kommunikationskanal innerhalb vongen ()zwischensub_gen ()undmain (). Daten "1" können direkt von "main ()" nach "sub_gen ()" gesendet werden, und der berechnete Wert "2" kann direkt von "sub_gen ()" nach "main ()" zurückgegeben werden. .. Sie können "sub_gen ()" auch beenden, indem Sie einen Fehler direkt von "main ()" an "sub_gen ()" senden.

Übrigens kann "Ertrag von" nicht nur "Ertrag von " sein, sondern auch "Ertrag von ".

4-5-2. Refactoring

Abstrakte Verbindung Socket

def connect(sock, address):
    f = Future()
    sock.setblocking(False)
    try:
        sock.connect(address)
    except BlockingIOError:
        pass

    def on_connected():
        f.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, on_connected)
    yield from f
    selector.unregister(sock.fileno())

Laden der abstrakten Antwort

def read(sock):
    f = Future()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield from f
    selector.unregister(sock.fileno())
    return chunk


def read_all(sock):
    response = []
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)
    return b''.join(response)

Refactoring Crawler

class Crawler:
    def __init__(self, path):
        self.path = path
        self.response = b''

    def fetch(self):
        global stopped
        sock = socket.socket()
        yield from connect(sock, ('example.com', 80))
        get = f'GET {self.path} HTTP/1.0\r\nHost: example.com\r\n\r\n'
        sock.send(get.encode('ascii'))
        self.response = yield from read_all(sock)
        paths_todo.remove(self.path)
        if not paths_todo:
            stopped = True

Der Code bis zu diesem Punkt ist in Ordnung. Der wiederverwendbare Teil wird als Funktion abstrahiert. Der Wert des Untergenerators kann auch mit "Ausbeute von" erhalten werden. Zu beachten ist jedoch, dass wir bei der Rückgabe eines "zukünftigen" Objekts "Ertrag von" anstelle von "Ertrag" verwenden. "Yield" kann auf gewöhnliche Python-Objekte angewendet werden, nicht jedoch auf "Yield from". Jetzt müssen wir die "Zukunft" ändern, um sie zu einem "iterierbaren" Objekt zu machen.

class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

    def __iter__(self):
        yield self
        return self.result

Ich habe gerade "iter" hinzugefügt. Natürlich müssen Sie nicht unbedingt "Ertrag von" verwenden, aber Sie können es als "Ertrag" belassen. Es ist jedoch besser, sie richtig zu verwenden, um zu unterscheiden, ob sie generatorbasierte Kollouts oder nur Generatoren haben. Daher war es nach der Einführung von "Yield from" aus Python 3.3 veraltet, Collouts mit "Yield" zu erstellen. Es besteht auch der Vorteil, dass Daten zwischen Kollouts frei gesendet werden können, indem die gegenseitige Kommunikationsfunktion "Ausbeute von" verwendet wird.

4-5-3. Zusammenfassung der Collout-Verbesserung durch Ausbeute aus

Durch die Verbesserung des Collouts durch "Yield from" konnten wir den Grad der Codeabstraktion erhöhen und geschäftslogikbezogene Angelegenheiten weiter vereinfachen. Die Interkommunikationsfunktion erleichtert den Datenaustausch zwischen Coroutinen. Damit hat die asynchrone Programmierung von Python große Fortschritte gemacht.

Und die Entwickler der Python-Sprache nutzten auch "Ausbeute von" voll aus. Das asynchrone Python-Programmierframework "Tulip" unter der Leitung von Guido hat sich ebenfalls mit enormer Geschwindigkeit weiterentwickelt, in Python 3.4 in "asyncio" umbenannt und vorläufig (standardmäßig) als Standardbibliothek übernommen.

4-6. asyncio

4-6-1. Einführung von Asyncio

asyncio ist ein asynchrones E / A-Framework (PEP 3156), das experimentell aus Python 3.4 eingeführt wurde. asyncio wurde in Python als Infrastruktur für die asynchrone E / A-Programmierung von Cortine bereitgestellt. Die Kernkomponenten bestehen aus Event Loop, Coroutine, Task, Future und anderen Zusatzmodulen.

Als "asyncio" eingeführt wurde, wurde auch ein Dekorateur namens "@ asyncio.coroutine" bereitgestellt. Sie können es als Collout markieren, indem Sie es an eine Funktion anhängen, die "Yield from" verwendet, aber Sie sind nicht gezwungen, es zu verwenden.

Mit Hilfe von "Yield from" aus Python 3.4 ist es einfacher, Corouties zu erstellen, aber wie bei historischen Problemen haben die Menschen eine Unterscheidung und Beziehung zwischen ** Generatoren ** und ** Coroutinen **. Ich verstehe nicht. Und ich kenne den Unterschied zwischen "Ertrag" und "Ertrag von" nicht. Diese Verwirrung verstößt gegen die Regeln von "Python Zen".

Ab Python 3.5 haben Python-Designer die Syntax "async / await" (PEP 492) hinzugefügt. , Zeigte eine explizite Unterstützung für Corroutine. Dies wird als ** natives Collout ** bezeichnet. Die beiden Collout-Stile "async / await" und "yield from" haben dieselbe interne Implementierung und sind miteinander kompatibel.

Und seit Python 3.6 ist asyncio offiziell der Standardbibliothek beigetreten. Das Obige ist die Entwicklungsbahn der asynchronen E / A in CPython.

4-6-2. Asyncio und native Collout

Erleben Sie die Bequemlichkeit der Syntax "asyncio" und "async / await".

import asyncio
import aiohttp
import time


loop = asyncio.get_event_loop()


async def fetch(path):
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get(path) as response:
            response = await response.read()
            return response


if __name__ == '__main__':
    host = 'http://example.com'
    paths_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
    elapsed_times = 0
    for _ in range(10):
        start = time.time()
        tasks = [fetch(host + path) for path in paths_todo]
        loop.run_until_complete(asyncio.gather(*tasks))
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

Ausführungsergebnis:

elapsed_time: 0.27[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.25[sec]
mean_elapsed_time: 0.26[sec]

Im Vergleich zum generatorbasierten Collout-Stil ist der "Asyncio" -Stil ganz anders.

Der Grund, warum Sie "Socket" nicht selbst betreiben, um HTTP-Anforderungen zu senden und Antworten zu empfangen, ist, dass es im tatsächlichen Geschäft äußerst schwierig ist, das HTTP-Protokoll gut zu handhaben. Wenn Sie einen asynchronen HTTP-Client mit voller Funktionalität haben, können Sie dies selbst tun. Weil du nicht musst.

Im Vergleich zur synchronen Blockierungsversion des Codes:

Zusammenfassung

Wir haben uns die Entwicklung und Mechanik der asynchronen Programmierung in Python genauer angesehen. Am Ende haben wir mit Code, der so einfach wie die synchrone Verarbeitung ist, eine N-mal höhere Effizienz erzielt. Und es hat nicht die Nachteile der Rückrufhölle.   Weitere Artikel zur Verwendung von "Asyncio", seinen Stärken und Schwächen und zur Unterscheidung von anderen asynchronen E / A-Lösungen im Python-Ökosystem "Asyncio".

Referenz

A Web Crawler With asyncio Coroutines Latency numbers every programmer should know

Recommended Posts

Vollständiges Verständnis der asynchronen Python-Programmierung
Vollständiges Verständnis der objektorientierten Programmierung von Python
Vollständiges Verständnis der Funktion numpy.pad
Memo der kollektiven Wissensprogrammierung verstehen
Ein grobes Verständnis von Python-Feuer und ein Memo
Hinweis zur Verwendung der Python-Eingabefunktion
Memorandum zu Djangos QueryDict
[Python] Ein grobes Verständnis des Protokollierungsmoduls
[Python] Ein grobes Verständnis von Iterablen, Iteratoren und Generatoren
[PyTorch] Ein wenig Verständnis von CrossEntropyLoss mit mathematischen Formeln
Asynchrone Verarbeitung von Python ~ Asynchron vollständig verstehen und warten ~
Asynchrone Programmierung mit libev # 2
[Hinweis] Beginn der Programmierung
Rekrutierung von Programmiermeistern
Simulation von Comp Gacha
elasticsearch_dsl Memorandum
Asynchrone Programmierung mit libev
Asynchrone Programmierung mit libev # 3
Erstellen Sie sofort ein Diagramm mit 2D-Daten mit der matplotlib von Python
[Für Anfänger] Eine Wortzusammenfassung der gängigen Programmiersprachen (Version 2018)