You can adjust the degree of parallelism by specifying max_workers in the Executor, but what happens if you submit at a pace that exceeds the number of parallelism does not block. Instead, it seems to store in memory. Due to this behavior, running in large numbers can consume a lot of memory.
with ThreadPoolExecutor(max_workers=10) as executor: for i in range(0, 1024*1024): #Lots executor.submit(fn, i) #to make #The for loop ends soon, but the memory consumption is supposed to be awesome
In fact, writing code that loops 1 million consumes about 2GB of memory. So I decided to think about how to deal with it.
After checking the internal implementation, ThreadPoolExecutor has a queue internally, Submit creates an object called WorkItem and puts it in the queue. This internal queue has no upper limit and can never be blocked, so you can submit endlessly.
By the way, the Worker thread is created at the timing of queuing, [Worker thread fetches data from the queue and executes it in an infinite loop](https://github.com/python/cpython/ blob / v3.8.6 / Lib / concurrent / futures / thread.py # L66).
Let's actually observe the movement. For example, execute a function that takes 0.01 seconds 5000 times. Let's turn this with max_workers = 10.
Look at the timestamp and memory (maxrss this time) as progress in the for statement.
From the time stamp, you can see that blocking has not occurred in submit (the process of submitting the for loop finishes immediately and is almost waiting for shutdown). However, you can see that the memory consumption increases as the process progresses.
This is the first method I thought of. Makes the queue used inside ThreadPoolExecutor a sized queue. Inherit and replace instance variables.
You can see from the timestamp that blocking is occurring during the loop. However, the total time does not change much and the memory consumption is very slow.
The code is simple, but it feels a bit sloppy to get involved in the internal implementation, and ProcessPoolExecutor doesn't have these queues, so this method doesn't work.
Since Plan 1 was not good enough, I was looking for something I could do and found the reference article.
Create a class BoundedExecutor that wraps PoolExecutor by referring to the reference source. Since it is API compatible (other than map), it can be replaced and used.
The internal implementation controls the concurrency by counting down the semaphore at the time of submit and counting up the semaphore when the worker processing is completed. "When the worker processing is completed" is "when the function registered by add_done_callback of future is called when it is completed". (The callback will be called when the worker's processing is completed and when a raise Exception occurs, so it should be correct.)
This also gave the same result as in Plan 1.
By the way, it is better to decide the size of the queue so that it is larger than max_workers (in the code, give or change the argument so that bounded_ratio = 1 becomes bounded_ratio = 2) If you set "Number of parallels == Queue size", there will be a timing when the queue becomes empty, workers will play, and the overall completion will be slightly delayed. Therefore, it is better to increase it a little.