Parallele Taskausführung mit concurrent.futures in Python

Übersicht über das Modul concurrent.futures

Das in Python 3.2 hinzugefügte Modul concurrent.futures bietet die Möglichkeit, mehrere Prozesse parallel auszuführen.

Es gibt andere Module in Python, die als Threading und Multiprocessing bezeichnet werden. Diese behandeln jedoch einen Thread-Prozess, während das Modul concurrent.futures mehrere Thread-Prozesse verarbeiten soll. ist.

Executor

Das Modul concurrent.futures verfügt über eine Executor -Klasse als abstrakte Klasse, und zwei Klassen werden als Implementierungsklassen bereitgestellt. Verwenden Sie eine der beiden Optionen, um parallele Aufgaben auszuführen.

max_workers

Im Executor-Konstruktor wird die maximale Anzahl von Aufgaben, die gleichzeitig ausgeführt werden können, durch das Argument max_workers angegeben. Wenn Sie mehr Aufgaben zum Ausführen anfordern, als gleichzeitig ausgeführt werden können, werden die Aufgaben zur Warteschlange hinzugefügt, warten auf den Abschluss der anderen Aufgaben und werden dann nacheinander ausgeführt.

Methoden zum Ausführen von Aufgaben senden und zuordnen

Executor verfügt über die folgenden Methoden, um parallele Aufgaben auszuführen.

Stichprobe

Dies ist ein Beispiel mit concurrent.futures. Es ist in Python3.6 geschrieben, aber ich denke, dass es mit einer kleinen Änderung betrieben werden kann, wenn die Umgebung 3.2 oder höher ist.

Ein einfaches Beispiel mit ThreadPoolExecutor

01_thread.py Führen Sie 5 Aufgaben in 2 Threads aus. Die Aufgabe besteht darin, nur 1 Sekunde zu schlafen.

python


def task(v):
    getLogger().info("%s start", v)
    time.sleep(1.0)
    getLogger().info("%s end", v)

def main():
    init_logger()
    getLogger().info("main start")
    with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
        for i in range(5):
            executor.submit(task, i)
        getLogger().info("submit end")
    getLogger().info("main end")
[2017-04-02 12:01:39,747] [MainThread] main start
[2017-04-02 12:01:39,748] [thread_0] 0 start
[2017-04-02 12:01:39,749] [thread_1] 1 start
[2017-04-02 12:01:39,750] [MainThread] submit end
[2017-04-02 12:01:40,755] [thread_0] 0 end
[2017-04-02 12:01:40,755] [thread_0] 2 start
[2017-04-02 12:01:40,756] [thread_1] 1 end
[2017-04-02 12:01:40,756] [thread_1] 3 start
[2017-04-02 12:01:41,761] [thread_0] 2 end
[2017-04-02 12:01:41,761] [thread_0] 4 start
[2017-04-02 12:01:41,761] [thread_1] 3 end
[2017-04-02 12:01:42,764] [thread_0] 4 end
[2017-04-02 12:01:42,765] [MainThread] main end

Erhalten Sie Ergebnisse mit Senden

02_future.py

Dies ist ein Beispiel für den Empfang des Ergebnisses von submit.

def task(v):
    getLogger().info("%s start", v)
    time.sleep(1.0)
    getLogger().info("%s end", v)
    return v * 2

def main():
    init_logger()
    getLogger().info("main start")
    with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
        futures = []
        for i in range(5):
            futures.append(executor.submit(task, i))
        getLogger().info("submit end")
        getLogger().info([f.result() for f in futures])
    getLogger().info("main end")
[2017-04-02 12:08:23,853] [MainThread] main start
[2017-04-02 12:08:23,854] [thread_0] 0 start
[2017-04-02 12:08:23,855] [thread_1] 1 start
[2017-04-02 12:08:23,856] [MainThread] submit end
[2017-04-02 12:08:24,856] [thread_0] 0 end
[2017-04-02 12:08:24,856] [thread_0] 2 start
[2017-04-02 12:08:24,857] [thread_1] 1 end
[2017-04-02 12:08:24,857] [thread_1] 3 start
[2017-04-02 12:08:25,863] [thread_0] 2 end
[2017-04-02 12:08:25,864] [thread_0] 4 start
[2017-04-02 12:08:25,864] [thread_1] 3 end
[2017-04-02 12:08:26,867] [thread_0] 4 end
[2017-04-02 12:08:26,868] [MainThread] [0, 2, 4, 6, 8]
[2017-04-02 12:08:26,868] [MainThread] main end

Beispiel für das Hinzufügen von Aufgaben im Stapel / das Erfassen von Ergebnissen im Stapel mit Karte

03_map.py Wenn Sie Aufgaben in einem Stapel verarbeiten möchten, ist das Schreiben mit map einfacher als mit submit.

def task(v):
    getLogger().info("%s start", v)
    time.sleep(1.0)
    getLogger().info("%s end", v)
    return v * 2

def main():
    init_logger()
    getLogger().info("main start")
    with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
        results = executor.map(task, range(5))
        getLogger().info("map end")
    getLogger().info(list(results))
    getLogger().info("main end")
[2017-04-02 12:10:03,997] [MainThread] main start
[2017-04-02 12:10:03,998] [thread_0] 0 start
[2017-04-02 12:10:04,000] [thread_1] 1 start
[2017-04-02 12:10:04,000] [MainThread] map end
[2017-04-02 12:10:05,005] [thread_0] 0 end
[2017-04-02 12:10:05,006] [thread_0] 2 start
[2017-04-02 12:10:05,006] [thread_1] 1 end
[2017-04-02 12:10:05,006] [thread_1] 3 start
[2017-04-02 12:10:06,007] [thread_0] 2 end
[2017-04-02 12:10:06,007] [thread_0] 4 start
[2017-04-02 12:10:06,007] [thread_1] 3 end
[2017-04-02 12:10:07,014] [thread_0] 4 end
[2017-04-02 12:10:07,014] [MainThread] [0, 2, 4, 6, 8]
[2017-04-02 12:10:07,014] [MainThread] main end

Beispiel für die Ausführung einer umfangreichen Verarbeitung mit ProcessPoolExecutor

04_process.py Zum Schluss ein Beispiel mit Process Pool Executor. Lassen Sie uns die Parameter ändern und den Unterschied in der Leistung sehen.

def task(params):
    (v, num_calc) = params
    a = float(v)
    for _ in range(num_calc):
        a = pow(a, a)
    return a

def main():
    init_logger()

    if len(sys.argv) != 5:
        print("usage: 05_process.py max_workers chunk_size num_tasks num_calc")
        sys.exit(1)
    (max_workers, chunk_size, num_tasks, num_calc) = map(int, sys.argv[1:])

    start = time()

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        params = map(lambda _: (random(), num_calc), range(num_tasks))
        results = executor.map(task, params, chunksize=chunk_size)
    getLogger().info(sum(results))

    getLogger().info("{:.3f}".format(time() - start))

Parameterbeschreibung

Ausführungsumgebung

Ausführungsergebnis

Die oberen vier sind das Ergebnis der Ausführung einer großen Anzahl großer Aufgaben und die unteren vier sind das Ergebnis der Ausführung einer großen Anzahl kleiner Aufgaben. Es scheint, dass die Angabe von chunk_size wichtig ist, wenn eine große Anzahl kleiner Aufgaben ausgeführt wird.

max_workers chunk_size num_tasks num_calc Ausführungszeit(sec)
1 1 100 100,000 1.954
2 1 100 100,000 1.042
1 10 100 100,000 1.922
2 10 100 100,000 1.071
1 1 10,000 1,000 3.295
2 1 10,000 1,000 3.423
1 10 10,000 1,000 2.272
2 10 10,000 1,000 1.279
1 100 10,000 1,000 2.126
2 100 10,000 1,000 1.090

Recommended Posts

Parallele Taskausführung mit concurrent.futures in Python
Paralleler Download mit Python
Führen Sie Python unittest parallel aus
Externe Befehlsausführung in Python
Übersetzt mit Googletrans in Python
Verwenden des Python-Modus in der Verarbeitung
Python-Memorandum zur parallelen / asynchronen Ausführung
GUI-Programmierung in Python mit Appjar
Vorsichtsmaßnahmen bei der Verwendung von Pit mit Python
Messen Sie die Ausführungszeit von Funktionen in Python
Verwendung globaler Variablen in Python-Funktionen
Mal sehen, wie man Eingaben in Python verwendet
Führen Sie das Python-Unittest-Modul in vs2017 aus
Gesamtleistung in Python (mit Funktools)
Handschriftliche Zeichenerkennung mit KNN in Python
Einfache parallele Ausführung mit Python-Unterprozess
Versuchen Sie es mit LeapMotion mit Python
Suche nach Tiefenpriorität mit Stack in Python
Bei Verwendung regulärer Ausdrücke in Python
GUI-Erstellung in Python mit tkinter 2
Mausbedienung mit Windows-API in Python
Hinweise zur Verwendung von cChardet und python3-chardet in Python 3.3.1.
Versuchen Sie es mit der Wunderlist-API in Python
GUI-Erstellung in Python mit tkinter Teil 1
Holen Sie sich Suica Balance in Python (mit libpafe)
Verwalten Sie mehrere Ausführungsumgebungen mit Python venv
Hash-Passwörter langsam mit bcrypt in Python
Versuchen Sie, die Kraken-API mit Python zu verwenden
Verwenden von venv in der Windows + Docker-Umgebung [Python]
Tweet mit der Twitter-API in Python
[Python] [Windows] Serielle Kommunikation in Python über DLL
Melden Sie sich mit Anforderungen in Python bei Slack an
Holen Sie sich Youtube-Daten in Python mithilfe der Youtube-Daten-API
Verwenden physikalischer Konstanten in Python scipy.constants ~ Konstante e ~
Scraping von Websites mit JavaScript in Python
Entwicklung eines Slack Bot mit Python mit chat.postMessage
Zeichnen Sie mit graphviz eine Baumstruktur in Python 3
Hinweise zur Verwendung von Python (Pydev) mit Eclipse
Krankheitsklassifizierung durch Random Forest mit Python
Laden Sie Dateien in jedem Format mit Python herunter
Benachrichtigen Sie mit Notification Center, wenn die Ausführungsumgebung in Python macOS ist
Quadtree in Python --2
Python in der Optimierung
CURL in Python
Metaprogrammierung mit Python
Python 3.3 mit Anaconda
Geokodierung in Python
SendKeys in Python
Erstellen Sie eine GIF-Datei mit Pillow in Python
Verwendung mehrerer Argumente bei der Parallelverarbeitung mithilfe der Mehrfachverarbeitung in Python
Metaanalyse in Python
E-Mail-Anhänge über Ihr Google Mail-Konto mit Python.
Unittest in Python
Erstellen eines Nummerierungsprozesses mit Python im lokalen DynamoDB-Nummerierungsprozess
Versuchen Sie es mit der BitFlyer Ligntning API in Python
Erhalten Sie Tastenanschläge während der Hintergrundausführung in Python (Windows)
Epoche in Python
Zwietracht in Python