Le module concurrent.futures ajouté dans Python 3.2 offre la possibilité d'exécuter plusieurs processus en parallèle.
Il existe d'autres modules en Python appelés threading et multiprocessing, mais ceux-ci gèrent un processus de thread, tandis que le module concurrent.futures vise à gérer plusieurs processus de thread. est.
Executor
Le module concurrent.futures a une classe Executor comme classe abstraite, et deux classes sont fournies comme classes d'implémentation. Utilisez l'un des deux pour effectuer des tâches parallèles.
max_workers
Dans le constructeur Executor, le nombre maximum de tâches pouvant être exécutées en même temps est spécifié par l'argument max_workers. Si vous demandez à exécuter plus de tâches que ce qui peut être exécuté simultanément, les tâches seront ajoutées à la file d'attente, en attendant que les autres tâches se terminent, puis s'exécutent de manière séquentielle.
Executor dispose des méthodes suivantes pour exécuter des tâches parallèles.
Ceci est un exemple utilisant concurrent.futures. Il est écrit en Python3.6, mais je pense qu'il peut être utilisé si l'environnement est 3.2 ou supérieur avec une petite modification.
01_thread.py Effectuez 5 tâches dans 2 threads. La tâche est juste de dormir pendant 1 seconde.
python
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
for i in range(5):
executor.submit(task, i)
getLogger().info("submit end")
getLogger().info("main end")
[2017-04-02 12:01:39,747] [MainThread] main start
[2017-04-02 12:01:39,748] [thread_0] 0 start
[2017-04-02 12:01:39,749] [thread_1] 1 start
[2017-04-02 12:01:39,750] [MainThread] submit end
[2017-04-02 12:01:40,755] [thread_0] 0 end
[2017-04-02 12:01:40,755] [thread_0] 2 start
[2017-04-02 12:01:40,756] [thread_1] 1 end
[2017-04-02 12:01:40,756] [thread_1] 3 start
[2017-04-02 12:01:41,761] [thread_0] 2 end
[2017-04-02 12:01:41,761] [thread_0] 4 start
[2017-04-02 12:01:41,761] [thread_1] 3 end
[2017-04-02 12:01:42,764] [thread_0] 4 end
[2017-04-02 12:01:42,765] [MainThread] main end
Voici un exemple de réception du résultat de [soumettre].
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
return v * 2
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
futures = []
for i in range(5):
futures.append(executor.submit(task, i))
getLogger().info("submit end")
getLogger().info([f.result() for f in futures])
getLogger().info("main end")
[2017-04-02 12:08:23,853] [MainThread] main start
[2017-04-02 12:08:23,854] [thread_0] 0 start
[2017-04-02 12:08:23,855] [thread_1] 1 start
[2017-04-02 12:08:23,856] [MainThread] submit end
[2017-04-02 12:08:24,856] [thread_0] 0 end
[2017-04-02 12:08:24,856] [thread_0] 2 start
[2017-04-02 12:08:24,857] [thread_1] 1 end
[2017-04-02 12:08:24,857] [thread_1] 3 start
[2017-04-02 12:08:25,863] [thread_0] 2 end
[2017-04-02 12:08:25,864] [thread_0] 4 start
[2017-04-02 12:08:25,864] [thread_1] 3 end
[2017-04-02 12:08:26,867] [thread_0] 4 end
[2017-04-02 12:08:26,868] [MainThread] [0, 2, 4, 6, 8]
[2017-04-02 12:08:26,868] [MainThread] main end
03_map.py Si vous souhaitez traiter des tâches dans un lot, il est plus facile d'écrire en utilisant map plutôt que submit.
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
return v * 2
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
results = executor.map(task, range(5))
getLogger().info("map end")
getLogger().info(list(results))
getLogger().info("main end")
[2017-04-02 12:10:03,997] [MainThread] main start
[2017-04-02 12:10:03,998] [thread_0] 0 start
[2017-04-02 12:10:04,000] [thread_1] 1 start
[2017-04-02 12:10:04,000] [MainThread] map end
[2017-04-02 12:10:05,005] [thread_0] 0 end
[2017-04-02 12:10:05,006] [thread_0] 2 start
[2017-04-02 12:10:05,006] [thread_1] 1 end
[2017-04-02 12:10:05,006] [thread_1] 3 start
[2017-04-02 12:10:06,007] [thread_0] 2 end
[2017-04-02 12:10:06,007] [thread_0] 4 start
[2017-04-02 12:10:06,007] [thread_1] 3 end
[2017-04-02 12:10:07,014] [thread_0] 4 end
[2017-04-02 12:10:07,014] [MainThread] [0, 2, 4, 6, 8]
[2017-04-02 12:10:07,014] [MainThread] main end
04_process.py Enfin, un exemple utilisant Process Pool Executor. Modifions les paramètres et voyons la différence de performances.
def task(params):
(v, num_calc) = params
a = float(v)
for _ in range(num_calc):
a = pow(a, a)
return a
def main():
init_logger()
if len(sys.argv) != 5:
print("usage: 05_process.py max_workers chunk_size num_tasks num_calc")
sys.exit(1)
(max_workers, chunk_size, num_tasks, num_calc) = map(int, sys.argv[1:])
start = time()
with ProcessPoolExecutor(max_workers=max_workers) as executor:
params = map(lambda _: (random(), num_calc), range(num_tasks))
results = executor.map(task, params, chunksize=chunk_size)
getLogger().info(sum(results))
getLogger().info("{:.3f}".format(time() - start))
Les quatre premiers sont les résultats de l'exécution d'un grand nombre de grandes tâches et les quatre derniers sont les résultats de l'exécution d'un grand nombre de petites tâches. Il semble que la spécification de chunk_size soit importante lors de l'exécution d'un grand nombre de petites tâches.
max_workers | chunk_size | num_tasks | num_calc | Temps d'exécution(sec) |
---|---|---|---|---|
1 | 1 | 100 | 100,000 | 1.954 |
2 | 1 | 100 | 100,000 | 1.042 |
1 | 10 | 100 | 100,000 | 1.922 |
2 | 10 | 100 | 100,000 | 1.071 |
1 | 1 | 10,000 | 1,000 | 3.295 |
2 | 1 | 10,000 | 1,000 | 3.423 |
1 | 10 | 10,000 | 1,000 | 2.272 |
2 | 10 | 10,000 | 1,000 | 1.279 |
1 | 100 | 10,000 | 1,000 | 2.126 |
2 | 100 | 10,000 | 1,000 | 1.090 |