Das in Python 3.2 hinzugefügte Modul concurrent.futures bietet die Möglichkeit, mehrere Prozesse parallel auszuführen.
Es gibt andere Module in Python, die als Threading und Multiprocessing bezeichnet werden. Diese behandeln jedoch einen Thread-Prozess, während das Modul concurrent.futures mehrere Thread-Prozesse verarbeiten soll. ist.
Executor
Das Modul concurrent.futures verfügt über eine Executor -Klasse als abstrakte Klasse, und zwei Klassen werden als Implementierungsklassen bereitgestellt. Verwenden Sie eine der beiden Optionen, um parallele Aufgaben auszuführen.
max_workers
Im Executor-Konstruktor wird die maximale Anzahl von Aufgaben, die gleichzeitig ausgeführt werden können, durch das Argument max_workers angegeben. Wenn Sie mehr Aufgaben zum Ausführen anfordern, als gleichzeitig ausgeführt werden können, werden die Aufgaben zur Warteschlange hinzugefügt, warten auf den Abschluss der anderen Aufgaben und werden dann nacheinander ausgeführt.
Executor verfügt über die folgenden Methoden, um parallele Aufgaben auszuführen.
Dies ist ein Beispiel mit concurrent.futures. Es ist in Python3.6 geschrieben, aber ich denke, dass es mit einer kleinen Änderung betrieben werden kann, wenn die Umgebung 3.2 oder höher ist.
01_thread.py Führen Sie 5 Aufgaben in 2 Threads aus. Die Aufgabe besteht darin, nur 1 Sekunde zu schlafen.
python
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
for i in range(5):
executor.submit(task, i)
getLogger().info("submit end")
getLogger().info("main end")
[2017-04-02 12:01:39,747] [MainThread] main start
[2017-04-02 12:01:39,748] [thread_0] 0 start
[2017-04-02 12:01:39,749] [thread_1] 1 start
[2017-04-02 12:01:39,750] [MainThread] submit end
[2017-04-02 12:01:40,755] [thread_0] 0 end
[2017-04-02 12:01:40,755] [thread_0] 2 start
[2017-04-02 12:01:40,756] [thread_1] 1 end
[2017-04-02 12:01:40,756] [thread_1] 3 start
[2017-04-02 12:01:41,761] [thread_0] 2 end
[2017-04-02 12:01:41,761] [thread_0] 4 start
[2017-04-02 12:01:41,761] [thread_1] 3 end
[2017-04-02 12:01:42,764] [thread_0] 4 end
[2017-04-02 12:01:42,765] [MainThread] main end
Dies ist ein Beispiel für den Empfang des Ergebnisses von submit.
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
return v * 2
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
futures = []
for i in range(5):
futures.append(executor.submit(task, i))
getLogger().info("submit end")
getLogger().info([f.result() for f in futures])
getLogger().info("main end")
[2017-04-02 12:08:23,853] [MainThread] main start
[2017-04-02 12:08:23,854] [thread_0] 0 start
[2017-04-02 12:08:23,855] [thread_1] 1 start
[2017-04-02 12:08:23,856] [MainThread] submit end
[2017-04-02 12:08:24,856] [thread_0] 0 end
[2017-04-02 12:08:24,856] [thread_0] 2 start
[2017-04-02 12:08:24,857] [thread_1] 1 end
[2017-04-02 12:08:24,857] [thread_1] 3 start
[2017-04-02 12:08:25,863] [thread_0] 2 end
[2017-04-02 12:08:25,864] [thread_0] 4 start
[2017-04-02 12:08:25,864] [thread_1] 3 end
[2017-04-02 12:08:26,867] [thread_0] 4 end
[2017-04-02 12:08:26,868] [MainThread] [0, 2, 4, 6, 8]
[2017-04-02 12:08:26,868] [MainThread] main end
03_map.py Wenn Sie Aufgaben in einem Stapel verarbeiten möchten, ist das Schreiben mit map einfacher als mit submit.
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
return v * 2
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
results = executor.map(task, range(5))
getLogger().info("map end")
getLogger().info(list(results))
getLogger().info("main end")
[2017-04-02 12:10:03,997] [MainThread] main start
[2017-04-02 12:10:03,998] [thread_0] 0 start
[2017-04-02 12:10:04,000] [thread_1] 1 start
[2017-04-02 12:10:04,000] [MainThread] map end
[2017-04-02 12:10:05,005] [thread_0] 0 end
[2017-04-02 12:10:05,006] [thread_0] 2 start
[2017-04-02 12:10:05,006] [thread_1] 1 end
[2017-04-02 12:10:05,006] [thread_1] 3 start
[2017-04-02 12:10:06,007] [thread_0] 2 end
[2017-04-02 12:10:06,007] [thread_0] 4 start
[2017-04-02 12:10:06,007] [thread_1] 3 end
[2017-04-02 12:10:07,014] [thread_0] 4 end
[2017-04-02 12:10:07,014] [MainThread] [0, 2, 4, 6, 8]
[2017-04-02 12:10:07,014] [MainThread] main end
04_process.py Zum Schluss ein Beispiel mit Process Pool Executor. Lassen Sie uns die Parameter ändern und den Unterschied in der Leistung sehen.
def task(params):
(v, num_calc) = params
a = float(v)
for _ in range(num_calc):
a = pow(a, a)
return a
def main():
init_logger()
if len(sys.argv) != 5:
print("usage: 05_process.py max_workers chunk_size num_tasks num_calc")
sys.exit(1)
(max_workers, chunk_size, num_tasks, num_calc) = map(int, sys.argv[1:])
start = time()
with ProcessPoolExecutor(max_workers=max_workers) as executor:
params = map(lambda _: (random(), num_calc), range(num_tasks))
results = executor.map(task, params, chunksize=chunk_size)
getLogger().info(sum(results))
getLogger().info("{:.3f}".format(time() - start))
Die oberen vier sind das Ergebnis der Ausführung einer großen Anzahl großer Aufgaben und die unteren vier sind das Ergebnis der Ausführung einer großen Anzahl kleiner Aufgaben. Es scheint, dass die Angabe von chunk_size wichtig ist, wenn eine große Anzahl kleiner Aufgaben ausgeführt wird.
max_workers | chunk_size | num_tasks | num_calc | Ausführungszeit(sec) |
---|---|---|---|---|
1 | 1 | 100 | 100,000 | 1.954 |
2 | 1 | 100 | 100,000 | 1.042 |
1 | 10 | 100 | 100,000 | 1.922 |
2 | 10 | 100 | 100,000 | 1.071 |
1 | 1 | 10,000 | 1,000 | 3.295 |
2 | 1 | 10,000 | 1,000 | 3.423 |
1 | 10 | 10,000 | 1,000 | 2.272 |
2 | 10 | 10,000 | 1,000 | 1.279 |
1 | 100 | 10,000 | 1,000 | 2.126 |
2 | 100 | 10,000 | 1,000 | 1.090 |
Recommended Posts