Parallel task execution using concurrent.futures in Python

Overview of concurrent.futures module

The concurrent.futures module added in Python 3.2 provides the ability to execute multiple processes in parallel.

There are other modules in Python called threading and multiprocessing, but these deal with one thread process, while the concurrent.futures module aims to handle multiple thread processes. is.

Executor

The concurrent.futures module has an Executor class as an abstract class, and two classes are provided as implementation classes. Use either of the two to run parallel tasks.

max_workers

In the Executor constructor, the maximum number of tasks that can be executed at the same time is specified by the argument max_workers. If you request the execution of more tasks than can be executed concurrently, the tasks are added to the queue, wait for the other tasks to finish, and then execute sequentially.

Methods to execute tasks submit and map

The Executor has the following methods to execute parallel tasks.

sample

This is a sample using concurrent.futures. It is written in Python3.6, but I think that it can be operated with a little modification in the environment of 3.2 or later.

A simple example using ThreadPoolExecutor

01_thread.py Perform 5 tasks in 2 threads. The task is just to sleep for 1 second.

python


def task(v):
    getLogger().info("%s start", v)
    time.sleep(1.0)
    getLogger().info("%s end", v)

def main():
    init_logger()
    getLogger().info("main start")
    with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
        for i in range(5):
            executor.submit(task, i)
        getLogger().info("submit end")
    getLogger().info("main end")
[2017-04-02 12:01:39,747] [MainThread] main start
[2017-04-02 12:01:39,748] [thread_0] 0 start
[2017-04-02 12:01:39,749] [thread_1] 1 start
[2017-04-02 12:01:39,750] [MainThread] submit end
[2017-04-02 12:01:40,755] [thread_0] 0 end
[2017-04-02 12:01:40,755] [thread_0] 2 start
[2017-04-02 12:01:40,756] [thread_1] 1 end
[2017-04-02 12:01:40,756] [thread_1] 3 start
[2017-04-02 12:01:41,761] [thread_0] 2 end
[2017-04-02 12:01:41,761] [thread_0] 4 start
[2017-04-02 12:01:41,761] [thread_1] 3 end
[2017-04-02 12:01:42,764] [thread_0] 4 end
[2017-04-02 12:01:42,765] [MainThread] main end

Receive results with submit

02_future.py

This is an example of receiving the result of submit.

def task(v):
    getLogger().info("%s start", v)
    time.sleep(1.0)
    getLogger().info("%s end", v)
    return v * 2

def main():
    init_logger()
    getLogger().info("main start")
    with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
        futures = []
        for i in range(5):
            futures.append(executor.submit(task, i))
        getLogger().info("submit end")
        getLogger().info([f.result() for f in futures])
    getLogger().info("main end")
[2017-04-02 12:08:23,853] [MainThread] main start
[2017-04-02 12:08:23,854] [thread_0] 0 start
[2017-04-02 12:08:23,855] [thread_1] 1 start
[2017-04-02 12:08:23,856] [MainThread] submit end
[2017-04-02 12:08:24,856] [thread_0] 0 end
[2017-04-02 12:08:24,856] [thread_0] 2 start
[2017-04-02 12:08:24,857] [thread_1] 1 end
[2017-04-02 12:08:24,857] [thread_1] 3 start
[2017-04-02 12:08:25,863] [thread_0] 2 end
[2017-04-02 12:08:25,864] [thread_0] 4 start
[2017-04-02 12:08:25,864] [thread_1] 3 end
[2017-04-02 12:08:26,867] [thread_0] 4 end
[2017-04-02 12:08:26,868] [MainThread] [0, 2, 4, 6, 8]
[2017-04-02 12:08:26,868] [MainThread] main end

Example of batch addition of tasks / batch acquisition of results with map

03_map.py If you want to process tasks in a batch, it is easier to write using map rather than submit.

def task(v):
    getLogger().info("%s start", v)
    time.sleep(1.0)
    getLogger().info("%s end", v)
    return v * 2

def main():
    init_logger()
    getLogger().info("main start")
    with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
        results = executor.map(task, range(5))
        getLogger().info("map end")
    getLogger().info(list(results))
    getLogger().info("main end")
[2017-04-02 12:10:03,997] [MainThread] main start
[2017-04-02 12:10:03,998] [thread_0] 0 start
[2017-04-02 12:10:04,000] [thread_1] 1 start
[2017-04-02 12:10:04,000] [MainThread] map end
[2017-04-02 12:10:05,005] [thread_0] 0 end
[2017-04-02 12:10:05,006] [thread_0] 2 start
[2017-04-02 12:10:05,006] [thread_1] 1 end
[2017-04-02 12:10:05,006] [thread_1] 3 start
[2017-04-02 12:10:06,007] [thread_0] 2 end
[2017-04-02 12:10:06,007] [thread_0] 4 start
[2017-04-02 12:10:06,007] [thread_1] 3 end
[2017-04-02 12:10:07,014] [thread_0] 4 end
[2017-04-02 12:10:07,014] [MainThread] [0, 2, 4, 6, 8]
[2017-04-02 12:10:07,014] [MainThread] main end

Example of executing heavy processing with ProcessPoolExecutor

04_process.py Finally, an example using Process Pool Executor. Let's change the parameters and see the difference in performance.

def task(params):
    (v, num_calc) = params
    a = float(v)
    for _ in range(num_calc):
        a = pow(a, a)
    return a

def main():
    init_logger()

    if len(sys.argv) != 5:
        print("usage: 05_process.py max_workers chunk_size num_tasks num_calc")
        sys.exit(1)
    (max_workers, chunk_size, num_tasks, num_calc) = map(int, sys.argv[1:])

    start = time()

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        params = map(lambda _: (random(), num_calc), range(num_tasks))
        results = executor.map(task, params, chunksize=chunk_size)
    getLogger().info(sum(results))

    getLogger().info("{:.3f}".format(time() - start))

Parameter description

Execution environment

Execution result

The top four are the results of executing a large number of large tasks and the bottom four are the results of executing a large number of small tasks. It seems that specifying chunk_size is important when executing a large number of small tasks.

max_workers chunk_size num_tasks num_calc Execution time(sec)
1 1 100 100,000 1.954
2 1 100 100,000 1.042
1 10 100 100,000 1.922
2 10 100 100,000 1.071
1 1 10,000 1,000 3.295
2 1 10,000 1,000 3.423
1 10 10,000 1,000 2.272
2 10 10,000 1,000 1.279
1 100 10,000 1,000 2.126
2 100 10,000 1,000 1.090

Recommended Posts

Parallel task execution using concurrent.futures in Python
Parallel download in Python
Run Python unittests in parallel
External command execution in Python
Translate using googletrans in Python
Using Python mode in Processing
python parallel / asynchronous execution memorandum
GUI programming in Python using Appjar
Precautions when using pit in Python
Measure function execution time in Python
Using global variables in python functions
Let's see using input in python
Python unittest module execution in vs2017
Infinite product in Python (using functools)
Edit videos in Python using MoviePy
Handwriting recognition using KNN in Python
Easy parallel execution with python subprocess
Try using Leap Motion in Python
Depth-first search using stack in Python
When using regular expressions in Python
GUI creation in python using tkinter 2
Mouse operation using Windows API in Python
Notes using cChardet and python3-chardet in Python 3.3.1.
Try using the Wunderlist API in Python
GUI creation in python using tkinter part 1
Get Suica balance in Python (using libpafe)
Manage multiple execution environments using Python venv
Slowly hash passwords using bcrypt in Python
Try using the Kraken API in Python
Using venv in Windows + Docker environment [Python]
Tweet using the Twitter API in Python
[Python] [Windows] Serial communication in Python using DLL
Log in to Slack using requests in Python
Get Youtube data in Python using Youtube Data API
Using physical constants in Python scipy.constants ~ constants e ~
Scraping a website using JavaScript in Python
Develop slack bot in python using chat.postMessage
Draw a tree in Python 3 using graphviz
Notes for using python (pydev) in eclipse
Disease classification in Random Forest using Python
Download files in any format using Python
Notes on using code formatter in Python
Meaning of using DI framework in Python
Notify using Notification Center when the execution environment is macOS in Python
Quadtree in Python --2
Python in optimization
CURL in python
Metaprogramming in Python
Python 3.3 in Anaconda
Geocoding in python
SendKeys in Python
Create a GIF file using Pillow in Python
How to take multiple arguments when doing parallel processing using multiprocessing in python
Meta-analysis in Python
Email attachments using your gmail account in python.
Unittest in python
Creating numbering process using python in DynamoDB Local Numbering process
Try using the BitFlyer Ligntning API in Python
Get keystrokes during background execution in Python (windows)
Epoch in Python
Discord in Python