The concurrent.futures module added in Python 3.2 provides the ability to execute multiple processes in parallel.
There are other modules in Python called threading and multiprocessing, but these deal with one thread process, while the concurrent.futures module aims to handle multiple thread processes. is.
Executor
The concurrent.futures module has an Executor class as an abstract class, and two classes are provided as implementation classes. Use either of the two to run parallel tasks.
max_workers
In the Executor constructor, the maximum number of tasks that can be executed at the same time is specified by the argument max_workers. If you request the execution of more tasks than can be executed concurrently, the tasks are added to the queue, wait for the other tasks to finish, and then execute sequentially.
The Executor has the following methods to execute parallel tasks.
This is a sample using concurrent.futures. It is written in Python3.6, but I think that it can be operated with a little modification in the environment of 3.2 or later.
01_thread.py Perform 5 tasks in 2 threads. The task is just to sleep for 1 second.
python
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
for i in range(5):
executor.submit(task, i)
getLogger().info("submit end")
getLogger().info("main end")
[2017-04-02 12:01:39,747] [MainThread] main start
[2017-04-02 12:01:39,748] [thread_0] 0 start
[2017-04-02 12:01:39,749] [thread_1] 1 start
[2017-04-02 12:01:39,750] [MainThread] submit end
[2017-04-02 12:01:40,755] [thread_0] 0 end
[2017-04-02 12:01:40,755] [thread_0] 2 start
[2017-04-02 12:01:40,756] [thread_1] 1 end
[2017-04-02 12:01:40,756] [thread_1] 3 start
[2017-04-02 12:01:41,761] [thread_0] 2 end
[2017-04-02 12:01:41,761] [thread_0] 4 start
[2017-04-02 12:01:41,761] [thread_1] 3 end
[2017-04-02 12:01:42,764] [thread_0] 4 end
[2017-04-02 12:01:42,765] [MainThread] main end
This is an example of receiving the result of submit.
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
return v * 2
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
futures = []
for i in range(5):
futures.append(executor.submit(task, i))
getLogger().info("submit end")
getLogger().info([f.result() for f in futures])
getLogger().info("main end")
[2017-04-02 12:08:23,853] [MainThread] main start
[2017-04-02 12:08:23,854] [thread_0] 0 start
[2017-04-02 12:08:23,855] [thread_1] 1 start
[2017-04-02 12:08:23,856] [MainThread] submit end
[2017-04-02 12:08:24,856] [thread_0] 0 end
[2017-04-02 12:08:24,856] [thread_0] 2 start
[2017-04-02 12:08:24,857] [thread_1] 1 end
[2017-04-02 12:08:24,857] [thread_1] 3 start
[2017-04-02 12:08:25,863] [thread_0] 2 end
[2017-04-02 12:08:25,864] [thread_0] 4 start
[2017-04-02 12:08:25,864] [thread_1] 3 end
[2017-04-02 12:08:26,867] [thread_0] 4 end
[2017-04-02 12:08:26,868] [MainThread] [0, 2, 4, 6, 8]
[2017-04-02 12:08:26,868] [MainThread] main end
03_map.py If you want to process tasks in a batch, it is easier to write using map rather than submit.
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
return v * 2
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
results = executor.map(task, range(5))
getLogger().info("map end")
getLogger().info(list(results))
getLogger().info("main end")
[2017-04-02 12:10:03,997] [MainThread] main start
[2017-04-02 12:10:03,998] [thread_0] 0 start
[2017-04-02 12:10:04,000] [thread_1] 1 start
[2017-04-02 12:10:04,000] [MainThread] map end
[2017-04-02 12:10:05,005] [thread_0] 0 end
[2017-04-02 12:10:05,006] [thread_0] 2 start
[2017-04-02 12:10:05,006] [thread_1] 1 end
[2017-04-02 12:10:05,006] [thread_1] 3 start
[2017-04-02 12:10:06,007] [thread_0] 2 end
[2017-04-02 12:10:06,007] [thread_0] 4 start
[2017-04-02 12:10:06,007] [thread_1] 3 end
[2017-04-02 12:10:07,014] [thread_0] 4 end
[2017-04-02 12:10:07,014] [MainThread] [0, 2, 4, 6, 8]
[2017-04-02 12:10:07,014] [MainThread] main end
04_process.py Finally, an example using Process Pool Executor. Let's change the parameters and see the difference in performance.
def task(params):
(v, num_calc) = params
a = float(v)
for _ in range(num_calc):
a = pow(a, a)
return a
def main():
init_logger()
if len(sys.argv) != 5:
print("usage: 05_process.py max_workers chunk_size num_tasks num_calc")
sys.exit(1)
(max_workers, chunk_size, num_tasks, num_calc) = map(int, sys.argv[1:])
start = time()
with ProcessPoolExecutor(max_workers=max_workers) as executor:
params = map(lambda _: (random(), num_calc), range(num_tasks))
results = executor.map(task, params, chunksize=chunk_size)
getLogger().info(sum(results))
getLogger().info("{:.3f}".format(time() - start))
The top four are the results of executing a large number of large tasks and the bottom four are the results of executing a large number of small tasks. It seems that specifying chunk_size is important when executing a large number of small tasks.
max_workers | chunk_size | num_tasks | num_calc | Execution time(sec) |
---|---|---|---|---|
1 | 1 | 100 | 100,000 | 1.954 |
2 | 1 | 100 | 100,000 | 1.042 |
1 | 10 | 100 | 100,000 | 1.922 |
2 | 10 | 100 | 100,000 | 1.071 |
1 | 1 | 10,000 | 1,000 | 3.295 |
2 | 1 | 10,000 | 1,000 | 3.423 |
1 | 10 | 10,000 | 1,000 | 2.272 |
2 | 10 | 10,000 | 1,000 | 1.279 |
1 | 100 | 10,000 | 1,000 | 2.126 |
2 | 100 | 10,000 | 1,000 | 1.090 |
Recommended Posts