[Python] Ich habe eine Praxis untersucht, die durch asynchrone Verarbeitung (Multiprocessing, Asyncio) parallel zum Hauptthread ausgeführt werden kann.

Ich habe eine Praxis der asynchronen Python-Verarbeitung in Betracht gezogen, die die folgenden Anforderungen erfüllt.

Der Code ist etwas redundant, aber alle Quellen, einschließlich der Logger, werden aufgelistet, damit Sie ihn so kopieren und einfügen können, wie er ist.

Fall 1 Hauptthread und asynchroner Thread werden parallel verarbeitet (mithilfe von Multiprocessing).

multiprocessing1.py


from multiprocessing.pool import ThreadPool
import time
import threading as th
import logging

#Holen Sie sich Logger
def get_logger():
    logger = logging.getLogger("multiprocesssing_test")
    logger.setLevel(logging.DEBUG)
    logger.propagate = False

    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
    ch.setFormatter(ch_formatter)
    
    logger.addHandler(ch)
    return logger

logger = get_logger()

def async_func(name, sleep_time):
    #Thread-ID abrufen
    thread_id = th.get_ident()

    logger.info(f"thread_id:{thread_id} name:{name} async_starte func")
    time.sleep(sleep_time)
    logger.info(f"thread_id:{thread_id} name:{name} async_Ende der Funktion")

    return f"{thread_id}-{name}"

if __name__ == "__main__":
    #Erstellen Sie einen Thread-Pool für die Thread-Ausführung
    #Maximale gleichzeitige Threads in Prozessen
    pool = ThreadPool(processes=1)

    #Thread-ID abrufen
    thread_id = th.get_ident()

    #Führen Sie die asynchrone Verarbeitung aus. Geben Sie das Funktionsobjekt als erstes Argument und das Argument als zweites Argument an.
    logger.info(f"thread_id:{thread_id}Rufen Sie die asynchrone Verarbeitung von main auf")
    future = pool.apply_async(async_func, ("Faden 1", 10))

    #Verarbeitung, die Sie parallel zur asynchronen Verarbeitung im Hauptthread ausführen möchten
    logger.info(f"thread_id:{thread_id}main Startet die Verarbeitung während der asynchronen Verarbeitung")
    time.sleep(5)
    logger.info(f"thread_id:{thread_id}main Ende der Verarbeitung während der asynchronen Verarbeitung")

    #Warten Sie, bis der asynchrone Prozess abgeschlossen ist, und erhalten Sie das Ergebnis.
    result = future.get()
    logger.info(f"thread_id:{thread_id}Holen Sie sich das Ergebnis der asynchronen Verarbeitung:{result}")
    pool.close()

Ausführungsergebnis

2020-10-15 16:43:27,073 - thread_id:18440 Asynchrone Verarbeitung von main aufrufen
2020-10-15 16:43:27,074 - thread_id:18440 main Startet die Verarbeitung während der asynchronen Verarbeitung
2020-10-15 16:43:27,074 - thread_id:18132 name:Thread 1 asynchron_starte func
2020-10-15 16:43:32,074 - thread_id:18440 main Ende der Verarbeitung während der asynchronen Verarbeitung
2020-10-15 16:43:37,075 - thread_id:18132 name:Thread 1 asynchron_Ende der Funktion
2020-10-15 16:43:37,075 - thread_id:18440 Ruft das Ergebnis der asynchronen Verarbeitung ab:18132-Faden 1

Aus dem Protokoll können Sie ersehen, dass die Prozesse "Asynchrone Hauptverarbeitung läuft" und "async_func start" parallel um 16:43:27 ausgeführt werden.

Fall 2 Hauptthread und mehrere asynchrone Threads werden parallel verarbeitet (mithilfe von Multiprocessing).

multiprocessing2.py


from multiprocessing.pool import ThreadPool
import time
import threading as th
import logging

#Holen Sie sich Logger
def get_logger():
    logger = logging.getLogger("multiprocesssing_test")
    logger.setLevel(logging.DEBUG)
    logger.propagate = False

    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
    ch.setFormatter(ch_formatter)
    
    logger.addHandler(ch)
    return logger

logger = get_logger()

def async_func(name, sleep_time):
    #Thread-ID abrufen
    thread_id = th.get_ident()

    logger.info(f"thread_id:{thread_id} name:{name} async_starte func")
    time.sleep(sleep_time)
    logger.info(f"thread_id:{thread_id} name:{name} async_Ende der Funktion")

    return f"{thread_id}-{name}"

if __name__ == "__main__":
    #Erstellen Sie einen Thread-Pool für die Thread-Ausführung
    #Maximale gleichzeitige Threads in Prozessen
    pool = ThreadPool(processes=5)

    #Thread-ID abrufen
    thread_id = th.get_ident()

    #Führen Sie die asynchrone Verarbeitung aus. Geben Sie das Funktionsobjekt als erstes Argument und das Argument als zweites Argument an.
    logger.info(f"thread_id:{thread_id}Rufen Sie die asynchrone Verarbeitung von main auf")
    futures = []
    for i in range(5):
        future = pool.apply_async(async_func, (f"Faden{i + 1}", 10)) # Tuple of args for foo
        futures.append(future)

    #Verarbeitung, die Sie parallel zur asynchronen Verarbeitung im Hauptthread ausführen möchten
    logger.info(f"thread_id:{thread_id}main Startet die Verarbeitung während der asynchronen Verarbeitung")
    time.sleep(5)
    logger.info(f"thread_id:{thread_id}main Ende der Verarbeitung während der asynchronen Verarbeitung")

    #Warten Sie, bis der asynchrone Prozess abgeschlossen ist, und erhalten Sie das Ergebnis.
    results = [future.get() for future in futures]
    logger.info(f"thread_id:{thread_id}Holen Sie sich das Ergebnis der asynchronen Verarbeitung:{results}")
    pool.close()

Ausführungsergebnis

2020-10-15 16:47:41,977 - thread_id:13448 Asynchrone Verarbeitung von main aufrufen
2020-10-15 16:47:41,978 - thread_id:13448 main Startet die Verarbeitung während der asynchronen Verarbeitung
2020-10-15 16:47:41,979 - thread_id:23216 name:Thread 1 asynchron_starte func
2020-10-15 16:47:41,979 - thread_id:21744 name:Thread 2 asynchron_starte func
2020-10-15 16:47:41,979 - thread_id:21708 name:Thread 3 asynchron_starte func
2020-10-15 16:47:41,979 - thread_id:21860 name:Thread 4 asynchron_starte func
2020-10-15 16:47:41,979 - thread_id:22100 name:Thread 5 asynchron_starte func
2020-10-15 16:47:46,980 - thread_id:13448 main Ende der Verarbeitung während der asynchronen Verarbeitung
2020-10-15 16:47:51,982 - thread_id:21744 name:Thread 2 asynchron_Ende der Funktion
2020-10-15 16:47:51,982 - thread_id:23216 name:Thread 1 asynchron_Ende der Funktion
2020-10-15 16:47:51,983 - thread_id:21708 name:Thread 3 asynchron_Ende der Funktion
2020-10-15 16:47:51,984 - thread_id:21860 name:Thread 4 asynchron_Ende der Funktion
2020-10-15 16:47:51,984 - thread_id:22100 name:Thread 5 asynchron_Ende der Funktion
2020-10-15 16:47:51,986 - thread_id:13448 Ruft das Ergebnis der asynchronen Verarbeitung ab:['23216-Faden 1', '21744-Faden 2', '21708-Faden 3', '21860-Su
Rot 4', '22100-Faden 5']

Aus dem Protokoll können Sie ersehen, dass um 16:47:41 Uhr fünf Prozesse, "Hauptasynchroner Prozess in Bearbeitung" und "async_func start", gleichzeitig parallel ausgeführt werden. Wenn Sie die Anzahl der Prozesse mithilfe von "ThreadPool (Prozesse = 3)" usw. reduzieren, werden zuerst 3 Threads ausgeführt, 2 befinden sich im Wartezustand und nach Abschluss wird ein neuer Thread ausgeführt.

Fall 3 Verarbeiten Sie den Hauptthread und mehrere asynchrone Threads parallel (mithilfe von Asyncio).

asyncio1.py


import asyncio
import itertools
import time
import profile
import random
import time
import threading as th
import logging

#Holen Sie sich Logger
def get_logger():
    logger = logging.getLogger("asyncio_test")
    logger.setLevel(logging.DEBUG)
    logger.propagate = False

    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
    ch.setFormatter(ch_formatter)
    
    logger.addHandler(ch)
    return logger

logger = get_logger()

#Holen Sie sich so etwas wie Task-ID
#* Da asyncio intern einen Generator verwendet
#Die Thread-ID ist dieselbe, und die Erfassungsmethode der ID, die der asynchronen Verarbeitung entspricht, lautet wie folgt.
_next_id = itertools.count().__next__
def get_task_id():
    return _next_id()

async def async_func(name, sleep_time):
    #Aufgaben-ID abrufen
    task_id = get_task_id()

    logger.info(f"task_id:{task_id} name:{name} async_starte func")
    await asyncio.sleep(sleep_time)
    logger.info(f"task_id:{task_id} name:{name} async_Ende der Funktion")

    return f"{task_id}-{name}"

async def async_func_caller():
    #Aufgaben-ID abrufen
    task_id = get_task_id()

    #Generieren Sie eine asynchrone Verarbeitungsaufgabe
    #* Zu diesem Zeitpunkt wird die Aufgabe nur generiert und nicht ausgeführt.
    # loop.run_until_Wird ausgeführt, wenn der Aufruf abgeschlossen ist.
    futures = [asyncio.ensure_future(async_func(f"task{i + 1}", 10)) for i in range(5)]

    #Verarbeitung, die Sie parallel zur asynchronen Verarbeitung im Hauptthread ausführen möchten
    logger.info(f"task_id:{task_id} async_func_Anrufer Startet die Verarbeitung während der asynchronen Verarbeitung")
    await asyncio.sleep(5)
    logger.info(f"task_id:{task_id} async_func_Anrufer Ende der Verarbeitung während der asynchronen Verarbeitung")

    #Warten Sie, bis der asynchrone Prozess abgeschlossen ist, und erhalten Sie das Ergebnis.
    results = await asyncio.gather(*futures)

    return results


if __name__ == "__main__":
    #Erstellen Sie einen Thread-Pool für die asynchrone Verarbeitung
    loop = asyncio.get_event_loop()

    logger.info(f"main Startet die Verarbeitung während der asynchronen Verarbeitung")

    #Führen Sie die asynchrone Verarbeitung aus und warten Sie bis zum Ende
    ret = loop.run_until_complete(async_func_caller())
    logger.info(f"main Ende der Verarbeitung während der asynchronen Verarbeitung Ergebnis:{ret}")
    loop.close()

Ausführungsergebnis

2020-10-15 16:49:40,132 -main Startet die Verarbeitung während der asynchronen Verarbeitung
2020-10-15 16:49:40,134 - task_id:0 async_func_Anrufer Startet die Verarbeitung während der asynchronen Verarbeitung
2020-10-15 16:49:40,134 - task_id:1 name:task1 async_starte func
2020-10-15 16:49:40,135 - task_id:2 name:task2 async_starte func
2020-10-15 16:49:40,135 - task_id:3 name:task3 async_starte func
2020-10-15 16:49:40,136 - task_id:4 name:task4 async_starte func
2020-10-15 16:49:40,136 - task_id:5 name:task5 async_starte func
2020-10-15 16:49:45,138 - task_id:0 async_func_Anrufer Ende der Verarbeitung während der asynchronen Verarbeitung
2020-10-15 16:49:50,141 - task_id:2 name:task2 async_Ende der Funktion
2020-10-15 16:49:50,142 - task_id:5 name:task5 async_Ende der Funktion
2020-10-15 16:49:50,142 - task_id:4 name:task4 async_Ende der Funktion
2020-10-15 16:49:50,144 - task_id:1 name:task1 async_Ende der Funktion
2020-10-15 16:49:50,144 - task_id:3 name:task3 async_Ende der Funktion
2020-10-15 16:49:50,145 -main Ende der Verarbeitung während der asynchronen Verarbeitung Ergebnis:['1-task1', '2-task2', '3-task3', '4-task4', '5-task5']

Aus dem Protokoll können Sie ersehen, dass um 16:49:40 Uhr fünf Prozesse, "Asynchroner Hauptprozess in Bearbeitung" und "Start von async_func", gleichzeitig parallel ausgeführt werden.

Wir hoffen, dass dies bei der Implementierung der asynchronen Verarbeitung hilfreich ist.

Recommended Posts

[Python] Ich habe eine Praxis untersucht, die durch asynchrone Verarbeitung (Multiprocessing, Asyncio) parallel zum Hauptthread ausgeführt werden kann.
Ich habe die Jumbo-Lotterie zum Jahresende mit Python gekauft und analysiert, die in Colaboratory ausgeführt werden kann
Erhalten Sie eine Liste der Ergebnisse der Parallelverarbeitung in Python mit Starmap
Die Geschichte, dass sendmail, die im Terminal ausgeführt werden kann, mit cron nicht funktioniert hat
Python3-Verarbeitung, die in Paiza verwendbar zu sein scheint
Goroutine (parallele Steuerung), die im Feld eingesetzt werden kann
Ich habe die Vorbehandlung untersucht, die mit PyCaret durchgeführt werden kann
Eine Funktion, die die Verarbeitungszeit einer Methode in Python misst
Ich habe ein Shuffle gemacht, das mit Python zurückgesetzt (zurückgesetzt) werden kann
[Python] Ein Programm, das die maximale Anzahl von Spielzeugen findet, die mit Ihrem Geld gekauft werden können
Artikel, der eine Person sein kann, die den Mechanismus der API versteht und beherrscht (mit Python-Code)
Python-Übungsdatenanalyse Zusammenfassung des Lernens, dass ich ungefähr 10 mit 100 Schlägen getroffen habe
Ich habe Pygame mit Python 3.5.1 in der Umgebung von pyenv unter OS X installiert
Ich möchte eine Prioritätswarteschlange erstellen, die mit Python (2.7) aktualisiert werden kann.
Ich habe PyQCheck, eine Bibliothek, die QuickCheck mit Python ausführen kann, in PyPI registriert.
Ich habe versucht, die Pferde vorherzusagen, die mit LightGBM unter den Top 3 sein werden
Asynchrone Verarbeitung in Python: Asyncio-Reverse-Referenz
Zeigen Sie das Ergebnis der Geometrieverarbeitung in Python an
Parallele Verarbeitung ohne tiefe Bedeutung in Python
Liste der Tools, mit denen Sie auf einfache Weise die Emotionsanalyse japanischer Sätze mit Python ausprobieren können (versuchen Sie es mit Google Colab).
Zusammenfassung der statistischen Datenanalysemethoden mit Python, die im Geschäftsleben verwendet werden können
Visualisierung von geografischen Informationen von R und Python, die von Power BI ausgedrückt werden können
Hier ist eine, ich werde die mit "künstlicher Intelligenz" ausgestatteten Anwendungen zusammenfassen, an denen ich interessiert war
[Python] Einführung in das WEB-Scraping | Zusammenfassung der Methoden, die mit dem Webdriver verwendet werden können
In Python3.8 und höher kann der inverse Mod mit der integrierten Funktion pow berechnet werden.
Ich habe versucht, die Verarbeitungsgeschwindigkeit mit dplyr von R und pandas von Python zu vergleichen
Hinweise zu Python-Kenntnissen, die mit AtCoder verwendet werden können
Ein Memo, dass ich den Datenspeicher mit Python berührt habe
Kann mit AtCoder verwendet werden! Eine Sammlung von Techniken zum Zeichnen von Kurzcode in Python!
Ich schrieb einen Test in "Ich habe versucht, die Wahrscheinlichkeit eines Bingospiels mit Python zu simulieren".
[Python] Ein Programm, um die Anzahl der Äpfel und Orangen zu ermitteln, die geerntet werden können
Parallelverarbeitung mit Mehrfachverarbeitung
Formatübersicht der Formate, die mit gensim serialisiert werden können
Ich habe versucht, die Entropie des Bildes mit Python zu finden
Versuchen Sie, COVID-19 Tokyo-Daten mit Python zu kratzen
Ich habe versucht, das Bild mit Python + OpenCV "gammakorrektur" zu machen
Warum kann ich das Modul durch Importieren mit Python verwenden?
Berechnen Sie mit Python Millionen von Stellen in der Quadratwurzel von 2
Ich habe die grundlegende Grammatik von Python in Jupyter Lab geschrieben
Ich habe die Strategie des Aktiensystemhandels mit Python evaluiert.
Ich habe verschiedene Methoden der Kommunikation zwischen Prozessen bei der Mehrfachverarbeitung von Python3 gemessen
[Homologie] Zählen Sie mit Python die Anzahl der Löcher in den Daten
Goroutine, die im Feld verwendet werden kann (errgroup.Group Edition)
Skripte, die bei der Verwendung von Bottle in Python verwendet werden können
Bewertungsindex, der für GridSearchCV von sklearn angegeben werden kann
Ich habe versucht, es zu erweitern, damit die Datenbank mit der Analysesoftware von Wiire verwendet werden kann
Verstehen Sie die Wahrscheinlichkeiten und Statistiken, die für das Fortschrittsmanagement mit einem Python-Programm verwendet werden können
Python Priority Queue Das Geheimnis des Ergebnisses der Heapify-Funktion, an dem die meisten Menschen nicht interessiert wären
Eine Geschichte, die nicht funktioniert hat, als ich versucht habe, mich mit dem Python-Anforderungsmodul anzumelden
Sagen Sie mit Word2Vec + Random Forest die Anzahl der Kissen voraus, die als Lachbefragte empfangen werden können
Die Geschichte des Erstellens eines Bots, der aktive Mitglieder in einem bestimmten Slack-Kanal mit Python anzeigt
Ich habe versucht, so viel wie möglich über GIL herauszufinden, das Sie wissen sollten, wenn Sie parallel mit Python arbeiten