Full understanding of Python threading and multiprocessing

threading and multiprocessing

The main operating systems of today are Mac OS, UNIX, Linux, and Windows. These operating systems support the "multitasking" feature.

What is multitasking? You might think, for example, in a situation where you launch a browser, listen to music, and write a report in Word, at least three tasks are going on at the same time. And besides the tasks on the front, various OS-related tasks are secretly running on the back.

It's easy to understand that a multi-core CPU can handle multitasking, but a single-core CPU can also multitask. The OS takes turns executing each task. For example, task 1 is 0.01 seconds, task 2 is 0.01 seconds, task 3 is 0.01 seconds, task 1 is 0.01 seconds, and so on. The CPU is fast, so it feels like it's almost simultaneous. This alternate execution is often referred to as ["concurrent computing"](https://ja.wikipedia.org/wiki/%E4%B8%A6%E8%A1%8C%E8%A8%88%E7 % AE% 97).

Of course, single-core CPUs are executed by turns, so in the true sense, simultaneous progress is possible only with multi-core CPUs. "[Parallel computing]" (https://ja.wikipedia.org/wiki/%E4%B8%A6%E5%) to process multiple tasks at the same time on each core at the time of multi-core CPU 88% 97% E8% A8% 88% E7% AE% 97) ". In most cases, the number of tasks being performed far exceeds the number of cores, so "alternate execution" work is also done in multi-core.

For the OS, one task is one process. For example, launching a browser creates a single browser process. Similarly, when you open Word, a Word process is created.

One process is not necessarily one process. For example, Word does a lot of processing, such as monitoring user input, spell checking, and UI display. These "subtasks" are called Threads. There is at least one thread per process. When there are multiple threads, they are executed alternately like a process.

There are two main ways to handle multitasking in Python at the same time.

Of course, you can have multiple threads in multiple processes, but this is not recommended due to the complexity of the model.

When processing multitasking, there are cases where communication and cooperation between tasks are required, task 1 needs to be paused when task 2 is executed, and task 3 and task 4 cannot proceed at the same time. , The program becomes a little complicated.

スレッドとプロセスの関係+プロセスはOSから割り当てられた様々なリソースを持っている..jpg (Source: Overview of System Software Lecture)

  1. threading On Unix-like OS, the following system call functions can be used mainly around the following threads.
function Description
start() Start a thread
setName() Give the thread a name
getName() Get the name of the thread
setDaemon(True) ThreaddaemonTo
join() Wait until the thread finishes processing
run() Manually execute thread processing

Python threads are not process simulated, they are real [POSIX threads](https://ja.wikipedia.org/wiki/POSIX%E3%82%B9%E3%83%AC%E3%83% 83% E3% 83% 89). From the standard library, you can use two modules, _thread </ code> and threading </ code>. And _thread </ code> is a low-level module, and threading </ code> is a module that encapsulates it. So I usually use threading </ code>.

1-1. Instantiation

You can start a thread by introducing a function etc. to create an instance of Thread </ code> and start it with start </ code>.

import threading
import time


def run(n):
    # threading.current_thread().name is getName()Call
    print("task: {} (thread name: {})".format(n, threading.current_thread().name))
    time.sleep(1)
    print('2s')
    time.sleep(1)
    print('1s')
    time.sleep(1)
    print('0s')
    time.sleep(1)


t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",), name='Thread T2') #Here setName()Is called
# start()
t1.start()
t2.start()
# join()
t1.join()
t2.join()
# join()Because I called
#The main thread waits for the above thread to finish
#Print when all is done
print(threading.current_thread().name)

Execution result:

task: t1 (thread name: Thread-1)
task: t2 (thread name: Thread T2)
2s
2s
1s
1s
0s
0s
MainThread

You can see that t1 and t2 are being executed alternately. One of the alternation rules will be explained in more detail in the GIL section after the IO operation (where the print </ code> operation applies).

1-2. Customize

It is also possible to inherit Thread </ code> and customize the run </ code> method of the thread class.

import threading
import time


class MyThread(threading.Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()
        self.n = n

    # run()Rewrite
    def run(self):
        print("task: {}".format(self.n))
        time.sleep(1)
        print('2s')
        time.sleep(1)
        print('1s')
        time.sleep(1)
        print('0s')
        time.sleep(1)


t1 = MyThread("t1")
t2 = MyThread("t2")

t1.start()
t2.start()

Execution result:

task: t1
task: t2
2s
2s
1s
1s
0s
0s

1-3. Calculate the number of threads

You can count the number of active threads with active_count </ code>. However, in a REPL environment, there are multiple threads to monitor, so the number of threads will be higher than expected.

Execute the following code in a script.

import threading
import time


def run(n):
    print("task: {}".format(n))
    time.sleep(1)


for i in range(3):
    t = threading.Thread(target=run, args=("t{}".format(i),))
    t.start()

time.sleep(0.5)
print(threading.active_count())

Execution result:

task: t0
task: t1
task: t2
4

When the print </ code> of the main thread is executed, the number of threads = 3 + 1 (main thread) because other threads are still running.

import threading
import time


def run(n):
    print("task: {}".format(n))
    time.sleep(0.5)


for i in range(3):
    t = threading.Thread(target=run, args=("t{}".format(i),))
    t.start()

time.sleep(1)
print(threading.active_count())

Execution result:

task: t0
task: t1
task: t2
1

By adjusting the execution time and delaying the print </ code> of the main thread, the number of active threads becomes 1 only for the main thread.

1-4. Daemon thread

Start the thread as a daemon.

import threading
import time


def run(n):
    print("task: {}".format(n))
    time.sleep(1)
    print('3')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('1')


for i in range(3):
    t = threading.Thread(target=run, args=("t{}".format(i),))
    # setDaemon(True)
    t.setDaemon(True) 
    t.start()

time.sleep(1.5)
print('Number of threads: {}'.format(threading.active_count()))

Execution result:

task: t0
task: t1
task: t2
3
3
3
Number of threads: 4

Since t1, t2, and t3 are set to the daemon thread of the main thread, they stop when the main thread ends. For example, Word's spell checking is a daemon thread that runs in an infinite loop, but when the main thread goes down, it goes down. 1-5. GIL When using a multi-core CPU in other programming languages, threads with the number of cores can be executed at the same time. However, Python only runs one thread at a time, which is one process. In short, Python multithreading is completely concurrency. The reason is [GIL (Global Interpreter Lock)](https://ja.wikipedia.org/wiki/%E3%82%B0%E3%83%AD%E3%83%BC%E3%83%90%E3 % 83% AB% E3% 82% A4% E3% 83% B3% E3% 82% BF% E3% 83% 97% E3% 83% AA% E3% 82% BF% E3% 83% AD% E3% 83 It is located at% 83% E3% 82% AF).

GIL is a kind of exclusive control (explained later). When Python was first designed, we implemented the GIL to make it easier to combine with data security and C libraries. When you run a thread, you need to get the GIL. There is only one Python process in a Python interpreter. And since there is only one GIL in a Python process, only one thread can be executed at a time. The GIL is like a passport, and threads that don't have a GIL can't enter the CPU. By the way, GIL is in CPython (normal Python distribution), but not in PyPy and Jython. Also, Ruby is a well-known language with GIL.

1-5-1. Multithreading procedure in CPython

  1. Get resources
  2. Request GIL
  3. Python interpreter procures OS native threads
  4. OS operates CPU to calculate
  5. If the GIL collection rules are met, the GIL will be collected to see if the calculation is complete.
  6. Another thread repeats the above steps
  7. When the GIL comes around again, process the continuation of the previous one until the GIL recovery rule is met again (context switch).

1-5-2. Different versions of GIL recovery rules

  • Python 2.X
  • Collect when IO operation occurs
  • Collect when ticks reach 100
  • ticks are counters for the GIL that record the number of Python virtual processes
  • When it reaches 100, the GIL will be collected and reset to 0.
  • Threshold can be set with sys.setcheckinterval </ code>
  • Python 3.X
  • ticks have been discarded
  • Measure the time with a timer and collect when the threshold is exceeded

As an experiment, let's run a simple infinite loop.

import threading
import multiprocessing


def loop():
    x = 0
    while True:
        x = x ^ 1


for i in range(multiprocessing.cpu_count()):
    t = threading.Thread(target=loop)
    t.start()
Screen Shot 2020-02-26 at 19.02.51.png

As you can see, because of the GIL, the CPU utilization is only around 100% in a single process, no matter how hard you try (it should be up to 400% available on a quad core CPU).

1-5-3. Computational efficiency of Python programs for different types of tasks

  • CPU bound task
  • After a certain period of time, the GIL will be collected and the threads will be switched, which will increase the calculation cost and slow down.
  • IO bound task
  • Switch threads each time an IO operation is performed. It is efficient because you can send it to other processing without waiting for slow file reading and writing.
  • If you want to make the most of multi-core CPU, multiprocessing is recommended. Each process has its own GIL.

1-6. Thread control

Resources are shared between threads in the same process. And since thread switching is random and out of order, the data can get messed up.

import threading


#Save money
balance = 0


def change_it(n):
    #Withdrawal and deposit should be 0
    global balance
    balance = balance + n
    balance = balance - n


def run_thread(n):
    for i in range(100000):
        change_it(n)


t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

As you can see by running the above code several times, the result is non-zero.

balance = balance + n </ code> can be split into two atomic operations.

x = balance + n
balance = x

The x </ code> here is a local variable, and each thread has its own x </ code>. When the above code is executed in order, it becomes as follows.

balance = 0 #initial value

t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t1: balance = x1     # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1     # balance = 0

t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2     # balance = 8
t2: x2 = balance - 8 # x2 = 8 - 8 = 0
t2: balance = x2     # balance = 0
    
balance = 0 #The result is correct

However, if the order is different, the result will be different.

balance = 0 #initial value

t1: x1 = balance + 5  # x1 = 0 + 5 = 5

t2: x2 = balance + 8  # x2 = 0 + 8 = 8
t2: balance = x2      # balance = 8

t1: balance = x1      # balance = 5
t1: x1 = balance - 5  # x1 = 5 - 5 = 0
t1: balance = x1      # balance = 0

t2: x2 = balance - 8  # x2 = 0 - 8 = -8
t2: balance = x2   # balance = -8

balance = -8 #The result is wrong

This phenomenon of unpredictable calculation results in multithreading is called Thread-unsafe.

To solve this, you need to lock and control the thread.

1-6-1. Exclusive control (mutex)

import threading


#Save money
balance = 0


def change_it(n):
    #Get lock
    lock.acquire()
    global balance
    balance = balance + n
    balance = balance - n
    #Release the lock
    lock.release()


def run_thread(n):
    for i in range(100000):
        change_it(n)


lock = threading.Lock()  #Instantiate a lock

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

By using exclusive control, no other thread can access the resource until the lock is released. By doing this, the calculation result will always be 0.

1-6-2. Recursive exclusive control

Exclusive control that can recursively release nested locks.

import threading


#Save money
balance = 0


def add_it(n):
    lock.acquire()
    global balance
    balance = balance + n
    return balance


def sub_it(n):
    lock.acquire()
    global balance
    balance = balance - n
    return balance


def change_it(n):
    #Get lock
    lock.acquire()
    global balance
    balance = add_it(n)
    balance = sub_it(n)
    #Recursively release lock
    lock.release()


def run_thread(n):
    for i in range(1000):
        change_it(n)


lock = threading.RLock()  #Instantiate a lock

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

Here, the lock is also acquired inside add_it </ code> and sub_it </ code>. By using recursive exclusive control, it is not necessary to release each lock, and all can be released in one shot. However, it is very computationally expensive, so we are reducing the number of loops.

1-6-3. Bounded Semaphore control

With exclusive control, a resource can be processed by only one thread at a certain time, while a semaphore is a limit that allows simultaneous processing of a certain number of threads. For example, a semaphore is a situation where there are three toilet seats in the toilet, three people are using them at the same time, and the other people wait in line.

import threading
import time


def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("current thread: {}\n".format(n))
    semaphore.release()


semaphore = threading.BoundedSemaphore(5)  #Allows simultaneous processing of 5 threads

for i in range(22):
    t = threading.Thread(target=run, args=("t-{}".format(i),))
    t.start()

while threading.active_count() != 1:
    pass  # print threading.active_count()
else:
    print('-----All threads have finished-----')

If you execute the above code, you can see that the current thread strings are output five by five.

1-6-4. Event control

Thread events are for the main thread to control other threads. The following methods are provided for Event </ code>.

Method Description
clear set flag to False
set Set flag to True
is_set Returns True when flag is True
wait Keep monitoring the flag; blocking when the flag is False
import threading
import time

event = threading.Event()


def lighter():
    '''
    flag=True:Green light
    flag=False:Red light
    '''
    count = 0
    event.set()  #The initial value is green light
    while True:
        if 5 < count <= 10:
            event.clear()  #Make a red light
            print("\33[41;1m red light...\033[0m")
        elif count > 10:
            event.set()  #Make a green light
            count = 0
        else:
            print("\33[42;1m green light...\033[0m")

        time.sleep(1)
        count += 1


def car(name):
    while True:
        if event.is_set():  #Check if the green light is
            print("[{}]Advance...".format(name))
            time.sleep(1)
        else:
            print("[{}]Wait for the traffic light because of the red light...".format(name))
            event.wait()
            # flag=Block here until True
            print("[{}]Start moving forward due to green light...".format(name))


light = threading.Thread(target=lighter,)
light.start()

car = threading.Thread(target=car, args=("MINI",))
car.start()

With the above code, simple communication between the traffic light and the thread of the car was realized at the event.

1-6-5. Timer control

You can also use a timer to control threads by time.

from threading import Timer


def hello():
    print("hello, world")


t = Timer(1, hello)
t.start()  #Hello is executed after 1 second

1-6-6. Condition control

There is also a method to control the thread by judging the condition. The following methods are provided in Condition </ code>.

Method Description
wait Hang a thread until notified or the argument timeout time is reached
notify Hung thread (default n=Notify 1); can only be used with the lock acquired
notifyAll Notify all hung threads
import threading
import time
from random import randint
from collections import deque


class Producer(threading.Thread):
    def run(self):
        global stocks
        while True:
            if lock_con.acquire():
                products = [randint(0, 100) for _ in range(5)]
                stocks.extend(products)
                print('Producer{}Is{}Produced.'.format(self.name, stocks))
                lock_con.notify()
                lock_con.release()
            time.sleep(3)


class Consumer(threading.Thread):
    def run(self):
        global stocks
        while True:
            lock_con.acquire()
            if len(stocks) == 0:
                #Wait until it is produced when the product runs out
                #Hang threads until notfified
                lock_con.wait()
            print('Customer{}Is{}Bought. stock: {}'.format(self.name, stocks.popleft(), stocks))
            lock_con.release()
            time.sleep(0.5)


stocks = deque()
lock_con = threading.Condition()
p = Producer()
c = Consumer()
p.start()
c.start()

Execution result:

Producer Thread-1 is deque([73, 2, 93, 52, 21])Produced.
Customer Thread-2 bought 73. stock: deque([2, 93, 52, 21])
Customer Thread-2 bought 2. stock: deque([93, 52, 21])
Customer Thread-2 bought 93. stock: deque([52, 21])
Customer Thread-2 bought 52. stock: deque([21])
Customer Thread-2 bought 21. stock: deque([])
Producer Thread-1 is deque([6, 42, 85, 56, 76])Produced.
Customer Thread-2 bought 6. stock: deque([42, 85, 56, 76])
Customer Thread-2 bought 42. stock: deque([85, 56, 76])
Customer Thread-2 bought 85. stock: deque([56, 76])
Customer Thread-2 bought 56. stock: deque([76])
Customer Thread-2 bought 76. stock: deque([])

It is a simple program in which the producer produces 5 products when the customer buys all the stock.

1-6-7. Barrier control

It is a control that is executed collectively when the specified number of threads pass through the barrier. For example, in an online competitive game, you can implement a barrier that waits for a certain amount of time until the team reaches a specified number of people. The following methods are provided in Barrier </ code>.

Method Description
wait Threads pass through the barrier; after the specified number of threads have passed, all waiting threads are released.
reset Empty the barrier; return BrokenBarrierError to the waiting thread
abort Broke the barrier to broke state; all current threads terminate; return BrokenBarrierError to threads attempting to pass through the barrier after this
import threading

num = 4


def start():
    print('{}Since I became a person, the game started.'.format(num))


lock = threading.Lock()
barrier = threading.Barrier(num, action=start)


class Player(threading.Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def run(self):
        try:
            if not barrier.broken:
                print('{}Participated.'.format(self.name))
                barrier.wait(2)
        except threading.BrokenBarrierError:
            print('Because the game cannot be started{}Has left.'.format(self.name))


players = []
for i in range(10):
    lock = threading.Lock()
    p = Player(name='Player {}'.format(i))
    players.append(p)

for p in players:
    p.start()

Execution result

Player 0 participated.
Player 1 participated.
Player 2 participated.
Player 3 participated.
The game started because there were four people.
Player 4 participated.
Player 5 participated.
Player 6 participated.
Player 7 participated.
The game started because there were four people.
Player 8 participated.
Player 9 participated.
Player 8 has left because the game cannot be started.
Player 9 has left because the game cannot be started.

Threads are executed randomly, so they are not always output in the above order. Here, the Player 8 and Player 9 teams (barriers) did not reach the specified number in time, so they were forced to leave (BrokenBarrierError).

1-7. ThreadLocal I explained that the data between threads is shared and needs to be locked to calculate the exact output. But sometimes you want each thread to handle its own local variables.

import threading


#Create ThreadLocal object in global scope
local_school = threading.local()

def process_student():
    #Earn students related to the current thread
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))

def process_thread(name):
    #Bind name to Student in ThreadLocal
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

Execution result:

Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)

The local_school </ code> here is a global variable, but since it is a ThreadLocal </ code> object, the instance variable student </ code> can be set without affecting each other from each thread. You can operate it. You can look at local_school </ code> as a dictionary, and you can bind teacher </ code> as well as student </ code>. And each thread can operate arbitrarily and does not affect each other. As a usage of ThreadLocal </ code>, you can make your own DB connection, http request, etc. for each thread. From the thread's point of view, all the received data is like a local variable and can be manipulated by other threads.

  1. multiprocessing On Unix-like operating systems, you can create a process with the system call fork () </ code>. Calling fork () </ code> will copy the current process. The copied process is called the child process, and the original process becomes its parent process. The return value of fork () </ code> is returned to both the child process and the parent process. And the return value of the child process is 0, and the ID of the child process is returned in the parent process. The reason is that the parent process must record the ID of the child process. You can get the ID of the parent process from the child process with getppid </ code>.

The Python OS </ code> module encapsulates the system call system.

import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

Execution result:

Process (19148) start...
I (19148) just created a child process (19149).
I am child process (19149) and my parent is 19148.

Here, the parent process and the child process enter different conditional branches. Please note that Windows does not have a system call fork () </ code> and cannot be executed.

By using fork () </ code>, when a process takes on a new task, you can create a new process to process it. For example, the famous Apache server allows the parent process to monitor the port and fork () </ code> to let the child process handle any new http requests.

When writing Python multiprocess programs, it is recommended to use the multiprocessing </ code> module of the standard library. The multiprocessing </ code> module is a module that can be processed in parallel. It is also said that the multiprocessing </ code> module was implemented because the threading </ code> module cannot be processed in parallel due to the GIL.

The multiprocessing </ code> module is also cross-platform, allowing you to write multiprocess programs on Windows. As mentioned above, Windows does not have fork () </ code>, so when creating a process with the multiprocessing </ code> module, a pseudo fork () </ code> I am processing. The way to do this is to serialize all the Python objects in the parent process with Pickle </ code> and pass them to the child process. So if the call to the multiprocessing </ code> module fails on Windows, the Pickle </ code> may have failed.

If you want to create a child process and execute an external command, you can use the standard library subprocess </ code>, but here, first, Python processing is performed in the multiprocess module multiprocessing </ code>. I will introduce the function.

2-1. Process

You can easily create child processes using processes.

from multiprocessing import Process
import os


#What the child process does
def run_proc(name):
    print('Run child process {} ({})...'.format(name, os.getpid()))


print('Parent process {}.'.format(os.getpid()))
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')

Execution result:

Parent process 19218.
Child process will start.
Run child process test (19219)...
Child process end.

Pass the execution function and arguments to Process </ code>, create an instance, and start it with start </ code>. You can easily create a child process from fork () </ code>. By using join </ code> here, the parent process waits until the child process finishes executing, just like a thread.

2-2. Process Pool

Creating a child process is very computationally expensive, so if you want to create a large number of processes, it is more efficient to create a process pool with Pool </ code>. The main methods of Pool </ code> are as follows.

Method Description
apply Synchronous processing
apply_async Asynchronous processing
terminate Exit immediately
join The parent process waits for the child process to finish processing; process joins can only be done after close or terminate
close Exit when all processes are finished
from multiprocessing import Pool
import os
import time
import random


def long_time_task(name):
    print('Run task {} ({})...'.format(name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task {} runs {} seconds.'.format(name, (end - start)))


print('Parent process {}.'.format(os.getpid()))
p = Pool(4)  #Up to 4 child processes at the same time
for i in range(5):
    p.apply_async(long_time_task, args=(i,))
#Because of asynchronous processing, the parent process does not have to wait for the child process to process.
#Do the next print
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')

Execution result:

Parent process 19348.
Waiting for all subprocesses done...
Run task 0 (19349)...
Run task 1 (19350)...
Run task 2 (19351)...
Run task 3 (19352)...
Task 1 runs 0.8950300216674805 seconds.
Run task 4 (19350)...
Task 2 runs 1.0132842063903809 seconds.
Task 4 runs 0.3936619758605957 seconds.
Task 3 runs 2.3689510822296143 seconds.
Task 0 runs 2.776203155517578 seconds.
All subprocesses done.

Since the pool size is 4, task 4 will start running after any of task 0 through task 3 finishes.

2-3. Interprocess communication

Unlike threads, data is not shared between processes. The OS offers many methods of interprocess communication. multiprocessing </ code> encapsulates low-level features of the OS and makes it easy to use.

2-3-1. Queue

FIFO data structure queues are often used for interprocess communication.

from multiprocessing import Process, Queue
import os
import time
import random


#Write data to Queue
def write(q):
    print('Process to write: {}'.format(os.getpid()))
    for value in ['A', 'B', 'C']:
        print('Put {} to queue...'.format(value))
        q.put(value)
        time.sleep(random.random())


#Read data from Queue
def read(q):
    print('Process to read: {}'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('Get {} from queue.'.format(value))


#The parent process creates a Queue and passes it to the child process
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
#Start pw and start writing
pw.start()
#Start pr and start reading
pr.start()
#Wait for pw to finish
pw.join()
#pr is an infinite loop, so kill it
pr.terminate()

Execution result:

Process to write: 19489
Put A to queue...
Process to read: 19490
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

Even if the reading is slow, it can be retrieved in the correct order because it is a FIFO.

2-3-2. Pipe

As the name suggests, pipes can be thought of as pipe-shaped data structures. Data is transmitted by putting data in one side of the pipe ( send </ code> method) and receiving data in the other side ( recv </ code> method). Please note that data can be corrupted if two processes put or receive data in the same type at the same time.

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()


parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()

Execution result:

[42, None, 'hello']

2-3-3. Shared memory

I explained that data between processes is not shared, but it's actually a lie ... As a function of the OS, [shared memory] between processes (https://ja.wikipedia.org/wiki/%E5%85%B1%E6%9C%89%E3%83%A1%E3%83%A2% E3% 83% AA #% E3% 82% BD% E3% 83% 95% E3% 83% 88% E3% 82% A6% E3% 82% A7% E3% 82% A2% E3% 81% AB% E3 % 82% 88% E3% 82% 8B% E5% 85% B1% E6% 9C% 89% E3% 83% A1% E3% 83% A2% E3% 83% AA) can be made. In Python, Value </ code> and Array </ code> allow you to keep numeric data and array dates in shared memory. As an aside, Value </ code> and Array </ code> use the C language data structure as it is. Python's numbers (inheriting the numbers class) is immutable and cannot be rewritten directly. ..

from multiprocessing import Process, Value, Array


def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]


num = Value('d', 0.0)  #double type number
arr = Array('i', range(10))  #Array

p = Process(target=f, args=(num, arr))
p.start()
p.join()

print(num.value)
print(arr[:])

Execution result:

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
    • The multiprocessing.shared_memory module has been added from python 3.8. I will update when miniconda is updated. *

2-3-4. Manager

It may be more accurate to say that managers share data rather than convey it. Manager () </ code> returns a manager object and creates a server process. Through the server process, other processes can work with Python objects in a proxy manner. Manager objects support Python list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array </ code> objects.

from multiprocessing import Process, Manager


def f(d, l, i):
    d[i] = i
    d[str(i)] = str(i)
    l.append(i)
    print(l)


with Manager() as manager:
    shared_dict = manager.dict()
    shared_list = manager.list()
    p_list = []
    #Create 10 processes
    for i in range(10):
        p = Process(target=f, args=(shared_dict, shared_list, i))
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()

    print('All subprocesses done.')
    print(shared_dict)
    print(shared_list)

Execution result:

[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4, 5]
[0, 1, 2, 3, 4, 5, 6]
[0, 1, 2, 3, 4, 5, 6, 8]
[0, 1, 2, 3, 4, 5, 6, 8, 7]
[0, 1, 2, 3, 4, 5, 6, 8, 7, 9]
All subprocesses done.
{0: 0, '0': '0', 1: 1, '1': '1', 2: 2, '2': '2', 3: 3, '3': '3', 4: 4, '4': '4', 5: 5, '5': '5', 6: 6, '6': '6', 8: 8, '8': '8', 7: 7, '7': '7', 9: 9, '9': '9'}
[0, 1, 2, 3, 4, 5, 6, 8, 7, 9]

I tried to create a list and dictionary of interprocess sharing in the manager. Here you can see that the processes are not processed in sequence.

2-3-5. Process lock processing

Like threads, processes have locks.

from multiprocessing import Process, Lock


def f(i):
    lock.acquire()
    try:
        print('hello world', i)
    finally:
        lock.release()


lock = Lock()

for num in range(10):
    Process(target=f, args=(num,)).start()

Execution result:

hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9

Due to the lock, the numbers are output in order unlike the last time. However, you will not be able to demonstrate the performance of multi-process.

2-4. Distributed process processing

Python processes can be distributed process processing using multiple machines. The managers </ code> submodule of the multiprocessing </ code> module can distribute processes across multiple machines. Even if you don't know the communication protocol, you can write a program for distributed process processing.

Distributed process processing requires a server process that distributes tasks and a worker process that actually processes the tasks. First, implement the server process task_master.py </ code>.

Here, managers </ code> publishes the queue to the Internet as ** api **. Once the server process has started the queue and put in the task, it can be accessed by other machines.

task_master.py


import random
import queue  #The standard library queue is sufficient because it is via the net.
from multiprocessing.managers import BaseManager


#Queue to send tasks
task_queue = queue.Queue()
#Queue to receive results
result_queue = queue.Queue()


class QueueManager(BaseManager):
    pass


#Register two queues as api
#In the case of Windows, lambda can be used for api registration, so please define the function obediently
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)

#Authenticated encryption using port 5000'abc'To
#For Windows, you need to specify the address (127).0.0.1)
manager = QueueManager(address=('', 5000), authkey=b'abc')
#to start
manager.start()
#Get a queue object over the net
task = manager.get_task_queue()
result = manager.get_result_queue()

#Try to put in a task
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task {}...'.format(n))
    task.put(n)

#receive the result from the result queue
print('Try get results...')
for i in range(10):
    #If it exceeds 10 seconds, it ends with timeout
    r = result.get(timeout=10)
    print('Result: {}'.format(r))

#End
manager.shutdown()
print('master exit.')

Next, implement the task_worker.py </ code> for the worker process. Get the task with ** api ** called manager.get_task_queue </ code> published above and process it.

task_worker.py


import time
import queue
from multiprocessing.managers import BaseManager


#Create the same Queue Manager
class QueueManager(BaseManager):
    pass


#Get api from the net and register it in Queue Manager
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

#Connect to the server
server_addr = '127.0.0.1'
print('Connect to server {}...'.format(server_addr))
#Set the same port and authentication encryption
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
#Connect
m.connect()

#Get each queue
task = m.get_task_queue()
result = m.get_result_queue()

#Receive a task from the task queue
#Store the processing result in the result queue
for i in range(10):
    try:
        n = task.get(timeout=1)
        #Here, the task is simple square calculation.
        print('run task {} * {}...'.format(n, n))
        r = '{} * {} = {}'.format(n, n, n*n)
        time.sleep(1)
        result.put(r)
    except queue.Empty:
        print('task queue is empty.')

#End
print('worker exit.')

It can also be run on a local machine.

Execution result: First, the server process first puts the task in the task_queue </ code>. Once you've put everything in, wait for the results in the result_queue </ code>.

task_master.py


Put task 7710...
Put task 6743...
Put task 8458...
Put task 2439...
Put task 1351...
Put task 9885...
Put task 5532...
Put task 4181...
Put task 6093...
Put task 3815...
Try get results...

The worker process then connects to the server, retrieves the task in the task_queue </ code>, and processes it. The processing result is sent to result_queue </ code>.

task_worker.py


Connect to server 127.0.0.1...
run task 7710 * 7710...
run task 6743 * 6743...
run task 8458 * 8458...
run task 2439 * 2439...
run task 1351 * 1351...
run task 9885 * 9885...
run task 5532 * 5532...
run task 4181 * 4181...
run task 6093 * 6093...
run task 3815 * 3815...
worker exit.

When the result comes in result_queue </ code>, the server process outputs it in order.

task_master.py


Put task 7710...
Put task 6743...
Put task 8458...
Put task 2439...
Put task 1351...
Put task 9885...
Put task 5532...
Put task 4181...
Put task 6093...
Put task 3815...
Try get results...
Result: 7710 * 7710 = 59444100
Result: 6743 * 6743 = 45468049
Result: 8458 * 8458 = 71537764
Result: 2439 * 2439 = 5948721
Result: 1351 * 1351 = 1825201
Result: 9885 * 9885 = 97713225
Result: 5532 * 5532 = 30603024
Result: 4181 * 4181 = 17480761
Result: 6093 * 6093 = 37124649
Result: 3815 * 3815 = 14554225
master exit.

All queues are in the server process because the worker process does not create the queue.

Screen Shot 2020-02-27 at 0.58.21.png (Source: [Hiroyukimine-like official website](https://www.liaoxuefeng.com/wiki/1016959663602400/1017631559645600))

In this way, a distributed process can be realized in Python. Powerful computing power can be obtained by processing using multiple workers.

  1. subprocess In Unix-like OS, fork () </ code> was explained to make a copy of the current process as a child process. That is, calling os.fork </ code> in Python creates a child process of your Python program. However, there are times when you need a child process that can execute external commands rather than a Python program. There is another system call exec () </ code> on Unix-like operating systems. It is implemented as os.execve </ code> in Python. exec () </ code> is a function that currently replaces a process with another program. That is, os.fork </ code> creates a child process of a Python program, and os.execve </ code> creates another program ( ls </ code>, ls </ code>, which can be executed in a shell You can replace it with a program like code> ping </ code>). The standard library subprocess </ code> is a module for creating child processes that execute external programs. Then, when executing an external program with subprocess </ code>, build a pipe (Pipe) for interprocess communication between the Python process and the child process, pass parameters, and send return values and errors. You will be able to receive it.

3-1. subprocess.run Starting with Python 3.5, it is officially recommended to run the command in subprocess.run </ code>. Here, the explanation such as subprocess.call </ code> of the old ** api ** is omitted.

subprocess.run(args, *, stdin=None, input=None, 
    stdout=None, stderr=None, shell=False, timeout=None, check=False, universal_newlines=False)

subprocess.run </ code> returns an instance of the CompletedProcess </ code> class. The attributes of the CompletedProcess </ code> class are as follows.

attribute Description
args Parameters passed to the child process; string or list
returncode Stores the status code after execution
stdout Standard output after execution
stderr Standard error after execution
check_returncode() Raise CalledProcessError when status code is non-zero (execution failure)

Here are some examples of using subprocess.run </ code>.

You can catch the standard output with subprocess.PIPE </ code> (otherwise the output will be discarded).

import subprocess


# subprocess.run(["ls", "-l"] stdout=subprocess.PIPE)Same as
obj = subprocess.run(["ls", "-l"], stdout=subprocess.PIPE)
print('stdout:\n{}'.format(obj.stdout.decode()))

Execution result:

stdout:
total 128
-rw-r--r--@ 1 kaito  staff   692 Feb 16 19:35 1-1.py
-rw-r--r--@ 1 kaito  staff   509 Feb 17 23:39 1-2.py
-rw-r--r--@ 1 kaito  staff   364 Feb 19 16:48 2-10.py
-rw-r--r--@ 1 kaito  staff   645 Feb 19 19:12 2-17.py
-rw-r--r--@ 1 kaito  staff   213 Feb 19 19:14 2-18.py
-rw-r--r--@ 1 kaito  staff   209 Feb 19 19:18 2-19.py
-rw-r--r--@ 1 kaito  staff   318 Feb 19 23:53 2-20.py
-rw-r--r--@ 1 kaito  staff   194 Feb 19 23:57 2-21.py
-rw-r--r--@ 1 kaito  staff   230 Feb 20 15:46 2-23.py
-rw-r--r--@ 1 kaito  staff   131 Feb 18 19:39 2-4.py
-rw-r--r--@ 1 kaito  staff   543 Feb 18 19:50 2-8.py
-rw-r--r--@ 1 kaito  staff   240 Feb 18 22:29 2-9.py
-rw-r--r--  1 kaito  staff  1339 Feb 27 00:25 task_master.py
-rw-r--r--  1 kaito  staff  1086 Feb 27 00:31 task_worker.py
-rw-r--r--  1 kaito  staff   446 Feb 27 20:26 test.py
-rw-r--r--  1 kaito  staff   199 Feb 27 20:31 test2.py

If check </ code> is set to True, an error will occur when the status code is non-zero.

subprocess.run("exit 1", shell=True, check=True)

Execution result:

Traceback (most recent call last):
  File "test2.py", line 4, in <module>
    subprocess.run("exit 1", shell=True, check=True)
  File "/Users/kaito/opt/miniconda3/lib/python3.7/subprocess.py", line 487, in run
    output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1.

The \ _ \ _ repr \ _ \ _ </ code> of the CompletedProcess </ code> class looks like this.

print(subprocess.run(["ls", "-l", "/dev/null"], stdout=subprocess.PIPE))

Execution result:

CompletedProcess(args=['ls', '-l', '/dev/null'], returncode=0, stdout=b'crw-rw-rw-  1 root  wheel    3,   2 Feb 27 20:37 /dev/null\n')

3-2. subprocess.Popen For advanced operations, you can use the subprocess.Popen </ code> class.

class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, 
    preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False,
    startup_info=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=())

The methods of the subprocess.Popen </ code> class are as follows.

Method Description
poll Returns the status code when the child process finishes executing; returns None if it has not finished
wait Wait for the child process to finish executing; raise a TimeoutExpired error when timeout occurs
communicate Communicate with child processes
send_signal Send a signal to a child process; for example signal.signal(signal.SIGINT)Is a UNIX-like OS command line, Ctrl+Signal when pressing C
terminate Terminate the child process
kill Kill child process

Here are some examples of using subprocess.Popen </ code>.

You can run your Python code as an external program.

import subprocess


#Pipe to standard input, standard output, standard error
p = subprocess.Popen(["python"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
#Write data to standard input
p.stdin.write(b'print("stdin")\n')
#Pass data as input for communicate
out, err = p.communicate(input=b'print("communicate")\n')
print(out.decode())

Execution result:

stdin
communicate

Pipeline processing using | </ code> can be constructed by connecting the standard output and standard input of two child processes with a pipe.

#Pipe the two child processes together
p1 = subprocess.Popen(['df', '-h'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', 'Data'], stdin=p1.stdout, stdout=subprocess.PIPE)
out, err = p2.communicate()  # df -h | grep Data
print(out.decode())

Execution result:

/dev/disk1s1   466Gi  438Gi  8.0Gi    99% 1156881 4881295959    0%   /System/Volumes/Data
map auto_home    0Bi    0Bi    0Bi   100%       0          0  100%   /System/Volumes/Data/home

  1. concurrent.futures We have introduced you to Python's multithreading and multiprocessing. You may have an image that is a little complicated and difficult to understand, but that is a fact (laughs). [Go](https://ja.wikipedia.org/wiki/Go_(%E3%83%97%E3%83%AD%E3%82%B0%E3%83%A9%E3%83%9F%E3 Languages with a design philosophy of simple parallel / parallel processing from the beginning, such as% 83% B3% E3% 82% B0% E8% A8% 80% E8% AA% 9E)), indicate the direction of evolution of programming languages. It may be.

However, the evolution of Python hasn't stopped yet. A high-level module called concurrent </ code> that further encapsulates threading </ code> and multiprocessing </ code> to make it easier to use is currently under development.

The current concurrent </ code> has only a module called futures </ code>. futures </ code> is [Future Pattern](https://ja.wikipedia.org/wiki/Future_%E3%83%91%E3%82%BF%E3%83%BC%E3%83% B3) Python implementation. Here, I would like to introduce the functions that can be used at this time.

4-1. Executor and Future

concurrent.futures </ code> provides ThreadPoolExecutor </ code> and ProcessPoolExecutor </ code>, which inherit from the Executor </ code> class. Become. ThreadPoolExecutor </ code> and ProcessPoolExecutor </ code> receive an argument called max_works </ code> that specifies the number of threads or processes. Perform one task with the submit </ code> method and return an instance of the Future </ code> class.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests


def load_url(url):
    return requests.get(url)


if __name__ == '__main__':
    url = 'https://www.python.org/'
    executor = ProcessPoolExecutor(max_workers=4)  # ThreadPoolExecutor(max_workers=4)
    future = executor.submit(load_url, url)
    print(future)
    while 1:
        if future.done():
            print('status code: {}'.format(future.result().status_code))
            break

Execution result:

<Future at 0x10ae058d0 state=running>
status code: 200

A simple http request. Note that when using ProcessPoolExecutor </ code>, the \ _ \ _ main__ </ code> module is required, so do not run it in a REPL environment.

The __main__ module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter.

4-2. map, as_completed and wait

The submit </ code> method can only execute one task, so if you want to execute multiple tasks, map </ code>, as_completed </ code> and wait < Use / code>.

The map </ code> method takes an execution function and sequence as arguments and returns a generator of execution results.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests


URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']


def load_url(url):
    return requests.get(url)


if __name__ == '__main__':
    # with ThreadPoolExecutor(max_workers=4) as executor:
    with ProcessPoolExecutor(max_workers=4) as executor:
        for url, data in zip(URLS, executor.map(load_url, URLS)):
            print('{} - status_code {}'.format(url, data.status_code))

Execution result:

https://google.com - status_code 200
https://www.python.org/ - status_code 200
https://api.github.com/ - status_code 200

The as_completed </ code> method returns a generator for the Future </ code> object. And it blocks when the task is not completed.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import requests


URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']


def load_url(url):
    return url, requests.get(url).status_code


if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as executor:
        tasks = [executor.submit(load_url, url) for url in URLS]
        for future in as_completed(tasks):
            print(*future.result())

Execution result:

https://google.com 200
https://www.python.org/ 200
https://api.github.com/ 200

The wait </ code> method blocks the main thread and main process. Three conditions can be set with the argument return_when </ code>.

conditions Description
ALL_COMPLETED Release blocking when all tasks are complete
FIRST_COMPLETED Release blocking when any task is complete
FIRST_EXCEPTION Release blocking if any task causes an error
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, ALL_COMPLETED
import requests


URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']


def load_url(url):
    requests.get(url)
    print(url)


if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as executor:
        tasks = [executor.submit(load_url, url) for url in URLS]
        wait(tasks, return_when=ALL_COMPLETED)
        print('all completed.')  #After 3 prints the main process is freed to print

Execution result:

https://www.python.org/
https://api.github.com/
https://google.com
all completed.

reference

Concurrency threading --- thread-based parallelism multiprocessing --- Process-based parallel processing subprocess --- Subprocess Management concurrent.futures --Concurrent task execution

Recommended Posts