The main operating systems of today are Mac OS, UNIX, Linux, and Windows. These operating systems support the "multitasking" feature.
What is multitasking? You might think, for example, in a situation where you launch a browser, listen to music, and write a report in Word, at least three tasks are going on at the same time. And besides the tasks on the front, various OS-related tasks are secretly running on the back.
It's easy to understand that a multi-core CPU can handle multitasking, but a single-core CPU can also multitask. The OS takes turns executing each task. For example, task 1 is 0.01 seconds, task 2 is 0.01 seconds, task 3 is 0.01 seconds, task 1 is 0.01 seconds, and so on. The CPU is fast, so it feels like it's almost simultaneous. This alternate execution is often referred to as ["concurrent computing"](https://ja.wikipedia.org/wiki/%E4%B8%A6%E8%A1%8C%E8%A8%88%E7 % AE% 97).
Of course, single-core CPUs are executed by turns, so in the true sense, simultaneous progress is possible only with multi-core CPUs. "[Parallel computing]" (https://ja.wikipedia.org/wiki/%E4%B8%A6%E5%) to process multiple tasks at the same time on each core at the time of multi-core CPU 88% 97% E8% A8% 88% E7% AE% 97) ". In most cases, the number of tasks being performed far exceeds the number of cores, so "alternate execution" work is also done in multi-core.
For the OS, one task is one process. For example, launching a browser creates a single browser process. Similarly, when you open Word, a Word process is created.
One process is not necessarily one process. For example, Word does a lot of processing, such as monitoring user input, spell checking, and UI display. These "subtasks" are called Threads. There is at least one thread per process. When there are multiple threads, they are executed alternately like a process.
There are two main ways to handle multitasking in Python at the same time.
Of course, you can have multiple threads in multiple processes, but this is not recommended due to the complexity of the model.
When processing multitasking, there are cases where communication and cooperation between tasks are required, task 1 needs to be paused when task 2 is executed, and task 3 and task 4 cannot proceed at the same time. , The program becomes a little complicated.
(Source: Overview of System Software Lecture)
function | Description |
---|---|
start() | Start a thread |
setName() | Give the thread a name |
getName() | Get the name of the thread |
setDaemon(True) | ThreaddaemonTo |
join() | Wait until the thread finishes processing |
run() | Manually execute thread processing |
Python threads are not process simulated, they are real [POSIX threads](https://ja.wikipedia.org/wiki/POSIX%E3%82%B9%E3%83%AC%E3%83% 83% E3% 83% 89). From the standard library, you can use two modules, _thread </ code> and
threading </ code>. And
_thread </ code> is a low-level module, and
threading </ code> is a module that encapsulates it. So I usually use
threading </ code>.
You can start a thread by introducing a function etc. to create an instance of Thread </ code> and start it with
start </ code>.
import threading
import time
def run(n):
# threading.current_thread().name is getName()Call
print("task: {} (thread name: {})".format(n, threading.current_thread().name))
time.sleep(1)
print('2s')
time.sleep(1)
print('1s')
time.sleep(1)
print('0s')
time.sleep(1)
t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",), name='Thread T2') #Here setName()Is called
# start()
t1.start()
t2.start()
# join()
t1.join()
t2.join()
# join()Because I called
#The main thread waits for the above thread to finish
#Print when all is done
print(threading.current_thread().name)
Execution result:
task: t1 (thread name: Thread-1)
task: t2 (thread name: Thread T2)
2s
2s
1s
1s
0s
0s
MainThread
You can see that t1 and t2 are being executed alternately. One of the alternation rules will be explained in more detail in the GIL section after the IO operation (where the print </ code> operation applies).
It is also possible to inherit Thread </ code> and customize the
run </ code> method of the thread class.
import threading
import time
class MyThread(threading.Thread):
def __init__(self, n):
super(MyThread, self).__init__()
self.n = n
# run()Rewrite
def run(self):
print("task: {}".format(self.n))
time.sleep(1)
print('2s')
time.sleep(1)
print('1s')
time.sleep(1)
print('0s')
time.sleep(1)
t1 = MyThread("t1")
t2 = MyThread("t2")
t1.start()
t2.start()
Execution result:
task: t1
task: t2
2s
2s
1s
1s
0s
0s
You can count the number of active threads with active_count </ code>. However, in a REPL environment, there are multiple threads to monitor, so the number of threads will be higher than expected.
Execute the following code in a script.
import threading
import time
def run(n):
print("task: {}".format(n))
time.sleep(1)
for i in range(3):
t = threading.Thread(target=run, args=("t{}".format(i),))
t.start()
time.sleep(0.5)
print(threading.active_count())
Execution result:
task: t0
task: t1
task: t2
4
When the print </ code> of the main thread is executed, the number of threads = 3 + 1 (main thread) because other threads are still running.
import threading
import time
def run(n):
print("task: {}".format(n))
time.sleep(0.5)
for i in range(3):
t = threading.Thread(target=run, args=("t{}".format(i),))
t.start()
time.sleep(1)
print(threading.active_count())
Execution result:
task: t0
task: t1
task: t2
1
By adjusting the execution time and delaying the print </ code> of the main thread, the number of active threads becomes 1 only for the main thread.
Start the thread as a daemon.
import threading
import time
def run(n):
print("task: {}".format(n))
time.sleep(1)
print('3')
time.sleep(1)
print('2')
time.sleep(1)
print('1')
for i in range(3):
t = threading.Thread(target=run, args=("t{}".format(i),))
# setDaemon(True)
t.setDaemon(True)
t.start()
time.sleep(1.5)
print('Number of threads: {}'.format(threading.active_count()))
Execution result:
task: t0
task: t1
task: t2
3
3
3
Number of threads: 4
Since t1, t2, and t3 are set to the daemon thread of the main thread, they stop when the main thread ends. For example, Word's spell checking is a daemon thread that runs in an infinite loop, but when the main thread goes down, it goes down. 1-5. GIL When using a multi-core CPU in other programming languages, threads with the number of cores can be executed at the same time. However, Python only runs one thread at a time, which is one process. In short, Python multithreading is completely concurrency. The reason is [GIL (Global Interpreter Lock)](https://ja.wikipedia.org/wiki/%E3%82%B0%E3%83%AD%E3%83%BC%E3%83%90%E3 % 83% AB% E3% 82% A4% E3% 83% B3% E3% 82% BF% E3% 83% 97% E3% 83% AA% E3% 82% BF% E3% 83% AD% E3% 83 It is located at% 83% E3% 82% AF).
GIL is a kind of exclusive control (explained later). When Python was first designed, we implemented the GIL to make it easier to combine with data security and C libraries. When you run a thread, you need to get the GIL. There is only one Python process in a Python interpreter. And since there is only one GIL in a Python process, only one thread can be executed at a time. The GIL is like a passport, and threads that don't have a GIL can't enter the CPU. By the way, GIL is in CPython (normal Python distribution), but not in PyPy and Jython. Also, Ruby is a well-known language with GIL.
sys.setcheckinterval </ code>
As an experiment, let's run a simple infinite loop.
import threading
import multiprocessing
def loop():
x = 0
while True:
x = x ^ 1
for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()
As you can see, because of the GIL, the CPU utilization is only around 100% in a single process, no matter how hard you try (it should be up to 400% available on a quad core CPU).
Resources are shared between threads in the same process. And since thread switching is random and out of order, the data can get messed up.
import threading
#Save money
balance = 0
def change_it(n):
#Withdrawal and deposit should be 0
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(100000):
change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
As you can see by running the above code several times, the result is non-zero.
balance = balance + n </ code> can be split into two atomic operations.
x = balance + n
balance = x
The x </ code> here is a local variable, and each thread has its own
x </ code>. When the above code is executed in order, it becomes as follows.
balance = 0 #initial value
t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0
t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8
t2: x2 = balance - 8 # x2 = 8 - 8 = 0
t2: balance = x2 # balance = 0
balance = 0 #The result is correct
However, if the order is different, the result will be different.
balance = 0 #initial value
t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8
t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0
t2: x2 = balance - 8 # x2 = 0 - 8 = -8
t2: balance = x2 # balance = -8
balance = -8 #The result is wrong
This phenomenon of unpredictable calculation results in multithreading is called Thread-unsafe.
To solve this, you need to lock and control the thread.
import threading
#Save money
balance = 0
def change_it(n):
#Get lock
lock.acquire()
global balance
balance = balance + n
balance = balance - n
#Release the lock
lock.release()
def run_thread(n):
for i in range(100000):
change_it(n)
lock = threading.Lock() #Instantiate a lock
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
By using exclusive control, no other thread can access the resource until the lock is released. By doing this, the calculation result will always be 0.
Exclusive control that can recursively release nested locks.
import threading
#Save money
balance = 0
def add_it(n):
lock.acquire()
global balance
balance = balance + n
return balance
def sub_it(n):
lock.acquire()
global balance
balance = balance - n
return balance
def change_it(n):
#Get lock
lock.acquire()
global balance
balance = add_it(n)
balance = sub_it(n)
#Recursively release lock
lock.release()
def run_thread(n):
for i in range(1000):
change_it(n)
lock = threading.RLock() #Instantiate a lock
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
Here, the lock is also acquired inside add_it </ code> and
sub_it </ code>. By using recursive exclusive control, it is not necessary to release each lock, and all can be released in one shot. However, it is very computationally expensive, so we are reducing the number of loops.
With exclusive control, a resource can be processed by only one thread at a certain time, while a semaphore is a limit that allows simultaneous processing of a certain number of threads. For example, a semaphore is a situation where there are three toilet seats in the toilet, three people are using them at the same time, and the other people wait in line.
import threading
import time
def run(n):
semaphore.acquire()
time.sleep(1)
print("current thread: {}\n".format(n))
semaphore.release()
semaphore = threading.BoundedSemaphore(5) #Allows simultaneous processing of 5 threads
for i in range(22):
t = threading.Thread(target=run, args=("t-{}".format(i),))
t.start()
while threading.active_count() != 1:
pass # print threading.active_count()
else:
print('-----All threads have finished-----')
If you execute the above code, you can see that the current thread strings are output five by five.
Thread events are for the main thread to control other threads. The following methods are provided for Event </ code>.
Method | Description |
---|---|
clear | set flag to False |
set | Set flag to True |
is_set | Returns True when flag is True |
wait | Keep monitoring the flag; blocking when the flag is False |
import threading
import time
event = threading.Event()
def lighter():
'''
flag=True:Green light
flag=False:Red light
'''
count = 0
event.set() #The initial value is green light
while True:
if 5 < count <= 10:
event.clear() #Make a red light
print("\33[41;1m red light...\033[0m")
elif count > 10:
event.set() #Make a green light
count = 0
else:
print("\33[42;1m green light...\033[0m")
time.sleep(1)
count += 1
def car(name):
while True:
if event.is_set(): #Check if the green light is
print("[{}]Advance...".format(name))
time.sleep(1)
else:
print("[{}]Wait for the traffic light because of the red light...".format(name))
event.wait()
# flag=Block here until True
print("[{}]Start moving forward due to green light...".format(name))
light = threading.Thread(target=lighter,)
light.start()
car = threading.Thread(target=car, args=("MINI",))
car.start()
With the above code, simple communication between the traffic light and the thread of the car was realized at the event.
You can also use a timer to control threads by time.
from threading import Timer
def hello():
print("hello, world")
t = Timer(1, hello)
t.start() #Hello is executed after 1 second
There is also a method to control the thread by judging the condition. The following methods are provided in Condition </ code>.
Method | Description |
---|---|
wait | Hang a thread until notified or the argument timeout time is reached |
notify | Hung thread (default n=Notify 1); can only be used with the lock acquired |
notifyAll | Notify all hung threads |
import threading
import time
from random import randint
from collections import deque
class Producer(threading.Thread):
def run(self):
global stocks
while True:
if lock_con.acquire():
products = [randint(0, 100) for _ in range(5)]
stocks.extend(products)
print('Producer{}Is{}Produced.'.format(self.name, stocks))
lock_con.notify()
lock_con.release()
time.sleep(3)
class Consumer(threading.Thread):
def run(self):
global stocks
while True:
lock_con.acquire()
if len(stocks) == 0:
#Wait until it is produced when the product runs out
#Hang threads until notfified
lock_con.wait()
print('Customer{}Is{}Bought. stock: {}'.format(self.name, stocks.popleft(), stocks))
lock_con.release()
time.sleep(0.5)
stocks = deque()
lock_con = threading.Condition()
p = Producer()
c = Consumer()
p.start()
c.start()
Execution result:
Producer Thread-1 is deque([73, 2, 93, 52, 21])Produced.
Customer Thread-2 bought 73. stock: deque([2, 93, 52, 21])
Customer Thread-2 bought 2. stock: deque([93, 52, 21])
Customer Thread-2 bought 93. stock: deque([52, 21])
Customer Thread-2 bought 52. stock: deque([21])
Customer Thread-2 bought 21. stock: deque([])
Producer Thread-1 is deque([6, 42, 85, 56, 76])Produced.
Customer Thread-2 bought 6. stock: deque([42, 85, 56, 76])
Customer Thread-2 bought 42. stock: deque([85, 56, 76])
Customer Thread-2 bought 85. stock: deque([56, 76])
Customer Thread-2 bought 56. stock: deque([76])
Customer Thread-2 bought 76. stock: deque([])
It is a simple program in which the producer produces 5 products when the customer buys all the stock.
It is a control that is executed collectively when the specified number of threads pass through the barrier. For example, in an online competitive game, you can implement a barrier that waits for a certain amount of time until the team reaches a specified number of people. The following methods are provided in Barrier </ code>.
Method | Description |
---|---|
wait | Threads pass through the barrier; after the specified number of threads have passed, all waiting threads are released. |
reset | Empty the barrier; return BrokenBarrierError to the waiting thread |
abort | Broke the barrier to broke state; all current threads terminate; return BrokenBarrierError to threads attempting to pass through the barrier after this |
import threading
num = 4
def start():
print('{}Since I became a person, the game started.'.format(num))
lock = threading.Lock()
barrier = threading.Barrier(num, action=start)
class Player(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def run(self):
try:
if not barrier.broken:
print('{}Participated.'.format(self.name))
barrier.wait(2)
except threading.BrokenBarrierError:
print('Because the game cannot be started{}Has left.'.format(self.name))
players = []
for i in range(10):
lock = threading.Lock()
p = Player(name='Player {}'.format(i))
players.append(p)
for p in players:
p.start()
Execution result
Player 0 participated.
Player 1 participated.
Player 2 participated.
Player 3 participated.
The game started because there were four people.
Player 4 participated.
Player 5 participated.
Player 6 participated.
Player 7 participated.
The game started because there were four people.
Player 8 participated.
Player 9 participated.
Player 8 has left because the game cannot be started.
Player 9 has left because the game cannot be started.
Threads are executed randomly, so they are not always output in the above order. Here, the Player 8 and Player 9 teams (barriers) did not reach the specified number in time, so they were forced to leave (BrokenBarrierError).
1-7. ThreadLocal I explained that the data between threads is shared and needs to be locked to calculate the exact output. But sometimes you want each thread to handle its own local variables.
import threading
#Create ThreadLocal object in global scope
local_school = threading.local()
def process_student():
#Earn students related to the current thread
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
#Bind name to Student in ThreadLocal
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
Execution result:
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
The local_school </ code> here is a global variable, but since it is a
ThreadLocal </ code> object, the instance variable
student </ code> can be set without affecting each other from each thread. You can operate it. You can look at
local_school </ code> as a dictionary, and you can bind
teacher </ code> as well as
student </ code>. And each thread can operate arbitrarily and does not affect each other. As a usage of
ThreadLocal </ code>, you can make your own DB connection, http request, etc. for each thread. From the thread's point of view, all the received data is like a local variable and can be manipulated by other threads.
fork () </ code>. Calling fork () </ code> will copy the current process. The copied process is called the child process, and the original process becomes its parent process. The return value of fork () </ code> is returned to both the child process and the parent process. And the return value of the child process is 0, and the ID of the child process is returned in the parent process. The reason is that the parent process must record the ID of the child process. You can get the ID of the parent process from the child process with getppid </ code>.
The Python OS </ code> module encapsulates the system call system.
import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
Execution result:
Process (19148) start...
I (19148) just created a child process (19149).
I am child process (19149) and my parent is 19148.
Here, the parent process and the child process enter different conditional branches. Please note that Windows does not have a system call fork () </ code> and cannot be executed.
By using fork () </ code>, when a process takes on a new task, you can create a new process to process it. For example, the famous Apache server allows the parent process to monitor the port and
fork () </ code> to let the child process handle any new http requests.
When writing Python multiprocess programs, it is recommended to use the multiprocessing </ code> module of the standard library. The
multiprocessing </ code> module is a module that can be processed in parallel. It is also said that the
multiprocessing </ code> module was implemented because the
threading </ code> module cannot be processed in parallel due to the GIL.
The multiprocessing </ code> module is also cross-platform, allowing you to write multiprocess programs on Windows. As mentioned above, Windows does not have
fork () </ code>, so when creating a process with the
multiprocessing </ code> module, a pseudo
fork () </ code> I am processing. The way to do this is to serialize all the Python objects in the parent process with
Pickle </ code> and pass them to the child process. So if the call to the
multiprocessing </ code> module fails on Windows, the
Pickle </ code> may have failed.
If you want to create a child process and execute an external command, you can use the standard library subprocess </ code>, but here, first, Python processing is performed in the multiprocess module
multiprocessing </ code>. I will introduce the function.
You can easily create child processes using processes.
from multiprocessing import Process
import os
#What the child process does
def run_proc(name):
print('Run child process {} ({})...'.format(name, os.getpid()))
print('Parent process {}.'.format(os.getpid()))
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
Execution result:
Parent process 19218.
Child process will start.
Run child process test (19219)...
Child process end.
Pass the execution function and arguments to Process </ code>, create an instance, and start it with
start </ code>. You can easily create a child process from
fork () </ code>. By using
join </ code> here, the parent process waits until the child process finishes executing, just like a thread.
Creating a child process is very computationally expensive, so if you want to create a large number of processes, it is more efficient to create a process pool with Pool </ code>. The main methods of
Pool </ code> are as follows.
Method | Description |
---|---|
apply | Synchronous processing |
apply_async | Asynchronous processing |
terminate | Exit immediately |
join | The parent process waits for the child process to finish processing; process joins can only be done after close or terminate |
close | Exit when all processes are finished |
from multiprocessing import Pool
import os
import time
import random
def long_time_task(name):
print('Run task {} ({})...'.format(name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task {} runs {} seconds.'.format(name, (end - start)))
print('Parent process {}.'.format(os.getpid()))
p = Pool(4) #Up to 4 child processes at the same time
for i in range(5):
p.apply_async(long_time_task, args=(i,))
#Because of asynchronous processing, the parent process does not have to wait for the child process to process.
#Do the next print
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
Execution result:
Parent process 19348.
Waiting for all subprocesses done...
Run task 0 (19349)...
Run task 1 (19350)...
Run task 2 (19351)...
Run task 3 (19352)...
Task 1 runs 0.8950300216674805 seconds.
Run task 4 (19350)...
Task 2 runs 1.0132842063903809 seconds.
Task 4 runs 0.3936619758605957 seconds.
Task 3 runs 2.3689510822296143 seconds.
Task 0 runs 2.776203155517578 seconds.
All subprocesses done.
Since the pool size is 4, task 4 will start running after any of task 0 through task 3 finishes.
Unlike threads, data is not shared between processes. The OS offers many methods of interprocess communication. multiprocessing </ code> encapsulates low-level features of the OS and makes it easy to use.
FIFO data structure queues are often used for interprocess communication.
from multiprocessing import Process, Queue
import os
import time
import random
#Write data to Queue
def write(q):
print('Process to write: {}'.format(os.getpid()))
for value in ['A', 'B', 'C']:
print('Put {} to queue...'.format(value))
q.put(value)
time.sleep(random.random())
#Read data from Queue
def read(q):
print('Process to read: {}'.format(os.getpid()))
while True:
value = q.get(True)
print('Get {} from queue.'.format(value))
#The parent process creates a Queue and passes it to the child process
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
#Start pw and start writing
pw.start()
#Start pr and start reading
pr.start()
#Wait for pw to finish
pw.join()
#pr is an infinite loop, so kill it
pr.terminate()
Execution result:
Process to write: 19489
Put A to queue...
Process to read: 19490
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
Even if the reading is slow, it can be retrieved in the correct order because it is a FIFO.
As the name suggests, pipes can be thought of as pipe-shaped data structures. Data is transmitted by putting data in one side of the pipe ( send </ code> method) and receiving data in the other side (
recv </ code> method). Please note that data can be corrupted if two processes put or receive data in the same type at the same time.
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()
Execution result:
[42, None, 'hello']
I explained that data between processes is not shared, but it's actually a lie ...
As a function of the OS, [shared memory] between processes (https://ja.wikipedia.org/wiki/%E5%85%B1%E6%9C%89%E3%83%A1%E3%83%A2% E3% 83% AA #% E3% 82% BD% E3% 83% 95% E3% 83% 88% E3% 82% A6% E3% 82% A7% E3% 82% A2% E3% 81% AB% E3 % 82% 88% E3% 82% 8B% E5% 85% B1% E6% 9C% 89% E3% 83% A1% E3% 83% A2% E3% 83% AA) can be made. In Python, Value </ code> and
Array </ code> allow you to keep numeric data and array dates in shared memory. As an aside,
Value </ code> and
Array </ code> use the C language data structure as it is. Python's numbers (inheriting the numbers class) is immutable and cannot be rewritten directly. ..
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
num = Value('d', 0.0) #double type number
arr = Array('i', range(10)) #Array
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
Execution result:
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
It may be more accurate to say that managers share data rather than convey it. Manager () </ code> returns a manager object and creates a server process. Through the server process, other processes can work with Python objects in a proxy manner. Manager objects support Python
list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array </ code> objects.
from multiprocessing import Process, Manager
def f(d, l, i):
d[i] = i
d[str(i)] = str(i)
l.append(i)
print(l)
with Manager() as manager:
shared_dict = manager.dict()
shared_list = manager.list()
p_list = []
#Create 10 processes
for i in range(10):
p = Process(target=f, args=(shared_dict, shared_list, i))
p.start()
p_list.append(p)
for p in p_list:
p.join()
print('All subprocesses done.')
print(shared_dict)
print(shared_list)
Execution result:
[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4, 5]
[0, 1, 2, 3, 4, 5, 6]
[0, 1, 2, 3, 4, 5, 6, 8]
[0, 1, 2, 3, 4, 5, 6, 8, 7]
[0, 1, 2, 3, 4, 5, 6, 8, 7, 9]
All subprocesses done.
{0: 0, '0': '0', 1: 1, '1': '1', 2: 2, '2': '2', 3: 3, '3': '3', 4: 4, '4': '4', 5: 5, '5': '5', 6: 6, '6': '6', 8: 8, '8': '8', 7: 7, '7': '7', 9: 9, '9': '9'}
[0, 1, 2, 3, 4, 5, 6, 8, 7, 9]
I tried to create a list and dictionary of interprocess sharing in the manager. Here you can see that the processes are not processed in sequence.
Like threads, processes have locks.
from multiprocessing import Process, Lock
def f(i):
lock.acquire()
try:
print('hello world', i)
finally:
lock.release()
lock = Lock()
for num in range(10):
Process(target=f, args=(num,)).start()
Execution result:
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9
Due to the lock, the numbers are output in order unlike the last time. However, you will not be able to demonstrate the performance of multi-process.
Python processes can be distributed process processing using multiple machines. The managers </ code> submodule of the
multiprocessing </ code> module can distribute processes across multiple machines. Even if you don't know the communication protocol, you can write a program for distributed process processing.
Distributed process processing requires a server process that distributes tasks and a worker process that actually processes the tasks. First, implement the server process task_master.py </ code>.
Here, managers </ code> publishes the queue to the Internet as ** api **. Once the server process has started the queue and put in the task, it can be accessed by other machines.
task_master.py
import random
import queue #The standard library queue is sufficient because it is via the net.
from multiprocessing.managers import BaseManager
#Queue to send tasks
task_queue = queue.Queue()
#Queue to receive results
result_queue = queue.Queue()
class QueueManager(BaseManager):
pass
#Register two queues as api
#In the case of Windows, lambda can be used for api registration, so please define the function obediently
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
#Authenticated encryption using port 5000'abc'To
#For Windows, you need to specify the address (127).0.0.1)
manager = QueueManager(address=('', 5000), authkey=b'abc')
#to start
manager.start()
#Get a queue object over the net
task = manager.get_task_queue()
result = manager.get_result_queue()
#Try to put in a task
for i in range(10):
n = random.randint(0, 10000)
print('Put task {}...'.format(n))
task.put(n)
#receive the result from the result queue
print('Try get results...')
for i in range(10):
#If it exceeds 10 seconds, it ends with timeout
r = result.get(timeout=10)
print('Result: {}'.format(r))
#End
manager.shutdown()
print('master exit.')
Next, implement the task_worker.py </ code> for the worker process. Get the task with ** api ** called
manager.get_task_queue </ code> published above and process it.
task_worker.py
import time
import queue
from multiprocessing.managers import BaseManager
#Create the same Queue Manager
class QueueManager(BaseManager):
pass
#Get api from the net and register it in Queue Manager
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
#Connect to the server
server_addr = '127.0.0.1'
print('Connect to server {}...'.format(server_addr))
#Set the same port and authentication encryption
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
#Connect
m.connect()
#Get each queue
task = m.get_task_queue()
result = m.get_result_queue()
#Receive a task from the task queue
#Store the processing result in the result queue
for i in range(10):
try:
n = task.get(timeout=1)
#Here, the task is simple square calculation.
print('run task {} * {}...'.format(n, n))
r = '{} * {} = {}'.format(n, n, n*n)
time.sleep(1)
result.put(r)
except queue.Empty:
print('task queue is empty.')
#End
print('worker exit.')
It can also be run on a local machine.
Execution result:
First, the server process first puts the task in the task_queue </ code>. Once you've put everything in, wait for the results in the
result_queue </ code>.
task_master.py
Put task 7710...
Put task 6743...
Put task 8458...
Put task 2439...
Put task 1351...
Put task 9885...
Put task 5532...
Put task 4181...
Put task 6093...
Put task 3815...
Try get results...
The worker process then connects to the server, retrieves the task in the task_queue </ code>, and processes it. The processing result is sent to
result_queue </ code>.
task_worker.py
Connect to server 127.0.0.1...
run task 7710 * 7710...
run task 6743 * 6743...
run task 8458 * 8458...
run task 2439 * 2439...
run task 1351 * 1351...
run task 9885 * 9885...
run task 5532 * 5532...
run task 4181 * 4181...
run task 6093 * 6093...
run task 3815 * 3815...
worker exit.
When the result comes in result_queue </ code>, the server process outputs it in order.
task_master.py
Put task 7710...
Put task 6743...
Put task 8458...
Put task 2439...
Put task 1351...
Put task 9885...
Put task 5532...
Put task 4181...
Put task 6093...
Put task 3815...
Try get results...
Result: 7710 * 7710 = 59444100
Result: 6743 * 6743 = 45468049
Result: 8458 * 8458 = 71537764
Result: 2439 * 2439 = 5948721
Result: 1351 * 1351 = 1825201
Result: 9885 * 9885 = 97713225
Result: 5532 * 5532 = 30603024
Result: 4181 * 4181 = 17480761
Result: 6093 * 6093 = 37124649
Result: 3815 * 3815 = 14554225
master exit.
All queues are in the server process because the worker process does not create the queue.
(Source: [Hiroyukimine-like official website](https://www.liaoxuefeng.com/wiki/1016959663602400/1017631559645600))In this way, a distributed process can be realized in Python. Powerful computing power can be obtained by processing using multiple workers.
fork () </ code> was explained to make a copy of the current process as a child process. That is, calling os.fork </ code> in Python creates a child process of your Python program. However, there are times when you need a child process that can execute external commands rather than a Python program.
There is another system call exec () </ code> on Unix-like operating systems. It is implemented as os.execve </ code> in Python. exec () </ code> is a function that currently replaces a process with another program. That is, os.fork </ code> creates a child process of a Python program, and os.execve </ code> creates another program ( ls </ code>, ls </ code>, which can be executed in a shell You can replace it with a program like code> ping </ code>).
The standard library subprocess </ code> is a module for creating child processes that execute external programs. Then, when executing an external program with subprocess </ code>, build a pipe (Pipe) for interprocess communication between the Python process and the child process, pass parameters, and send return values and errors. You will be able to receive it.
3-1. subprocess.run
Starting with Python 3.5, it is officially recommended to run the command in subprocess.run </ code>. Here, the explanation such as
subprocess.call </ code> of the old ** api ** is omitted.
subprocess.run(args, *, stdin=None, input=None,
stdout=None, stderr=None, shell=False, timeout=None, check=False, universal_newlines=False)
subprocess.run </ code> returns an instance of the
CompletedProcess </ code> class. The attributes of the
CompletedProcess </ code> class are as follows.
attribute | Description |
---|---|
args | Parameters passed to the child process; string or list |
returncode | Stores the status code after execution |
stdout | Standard output after execution |
stderr | Standard error after execution |
check_returncode() | Raise CalledProcessError when status code is non-zero (execution failure) |
Here are some examples of using subprocess.run </ code>.
You can catch the standard output with subprocess.PIPE </ code> (otherwise the output will be discarded).
import subprocess
# subprocess.run(["ls", "-l"] stdout=subprocess.PIPE)Same as
obj = subprocess.run(["ls", "-l"], stdout=subprocess.PIPE)
print('stdout:\n{}'.format(obj.stdout.decode()))
Execution result:
stdout:
total 128
-rw-r--r--@ 1 kaito staff 692 Feb 16 19:35 1-1.py
-rw-r--r--@ 1 kaito staff 509 Feb 17 23:39 1-2.py
-rw-r--r--@ 1 kaito staff 364 Feb 19 16:48 2-10.py
-rw-r--r--@ 1 kaito staff 645 Feb 19 19:12 2-17.py
-rw-r--r--@ 1 kaito staff 213 Feb 19 19:14 2-18.py
-rw-r--r--@ 1 kaito staff 209 Feb 19 19:18 2-19.py
-rw-r--r--@ 1 kaito staff 318 Feb 19 23:53 2-20.py
-rw-r--r--@ 1 kaito staff 194 Feb 19 23:57 2-21.py
-rw-r--r--@ 1 kaito staff 230 Feb 20 15:46 2-23.py
-rw-r--r--@ 1 kaito staff 131 Feb 18 19:39 2-4.py
-rw-r--r--@ 1 kaito staff 543 Feb 18 19:50 2-8.py
-rw-r--r--@ 1 kaito staff 240 Feb 18 22:29 2-9.py
-rw-r--r-- 1 kaito staff 1339 Feb 27 00:25 task_master.py
-rw-r--r-- 1 kaito staff 1086 Feb 27 00:31 task_worker.py
-rw-r--r-- 1 kaito staff 446 Feb 27 20:26 test.py
-rw-r--r-- 1 kaito staff 199 Feb 27 20:31 test2.py
If check </ code> is set to True, an error will occur when the status code is non-zero.
subprocess.run("exit 1", shell=True, check=True)
Execution result:
Traceback (most recent call last):
File "test2.py", line 4, in <module>
subprocess.run("exit 1", shell=True, check=True)
File "/Users/kaito/opt/miniconda3/lib/python3.7/subprocess.py", line 487, in run
output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1.
The \ _ \ _ repr \ _ \ _ </ code> of the
CompletedProcess </ code> class looks like this.
print(subprocess.run(["ls", "-l", "/dev/null"], stdout=subprocess.PIPE))
Execution result:
CompletedProcess(args=['ls', '-l', '/dev/null'], returncode=0, stdout=b'crw-rw-rw- 1 root wheel 3, 2 Feb 27 20:37 /dev/null\n')
3-2. subprocess.Popen
For advanced operations, you can use the subprocess.Popen </ code> class.
class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False,
startup_info=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=())
The methods of the subprocess.Popen </ code> class are as follows.
Method | Description |
---|---|
poll | Returns the status code when the child process finishes executing; returns None if it has not finished |
wait | Wait for the child process to finish executing; raise a TimeoutExpired error when timeout occurs |
communicate | Communicate with child processes |
send_signal | Send a signal to a child process; for example signal.signal(signal.SIGINT)Is a UNIX-like OS command line, Ctrl+Signal when pressing C |
terminate | Terminate the child process |
kill | Kill child process |
Here are some examples of using subprocess.Popen </ code>.
You can run your Python code as an external program.
import subprocess
#Pipe to standard input, standard output, standard error
p = subprocess.Popen(["python"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
#Write data to standard input
p.stdin.write(b'print("stdin")\n')
#Pass data as input for communicate
out, err = p.communicate(input=b'print("communicate")\n')
print(out.decode())
Execution result:
stdin
communicate
Pipeline processing using | </ code> can be constructed by connecting the standard output and standard input of two child processes with a pipe.
#Pipe the two child processes together
p1 = subprocess.Popen(['df', '-h'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', 'Data'], stdin=p1.stdout, stdout=subprocess.PIPE)
out, err = p2.communicate() # df -h | grep Data
print(out.decode())
Execution result:
/dev/disk1s1 466Gi 438Gi 8.0Gi 99% 1156881 4881295959 0% /System/Volumes/Data
map auto_home 0Bi 0Bi 0Bi 100% 0 0 100% /System/Volumes/Data/home
However, the evolution of Python hasn't stopped yet. A high-level module called concurrent </ code> that further encapsulates
threading </ code> and
multiprocessing </ code> to make it easier to use is currently under development.
The current concurrent </ code> has only a module called
futures </ code>.
futures </ code> is [Future Pattern](https://ja.wikipedia.org/wiki/Future_%E3%83%91%E3%82%BF%E3%83%BC%E3%83% B3) Python implementation. Here, I would like to introduce the functions that can be used at this time.
concurrent.futures </ code> provides
ThreadPoolExecutor </ code> and
ProcessPoolExecutor </ code>, which inherit from the
Executor </ code> class. Become.
ThreadPoolExecutor </ code> and
ProcessPoolExecutor </ code> receive an argument called
max_works </ code> that specifies the number of threads or processes. Perform one task with the
submit </ code> method and return an instance of the
Future </ code> class.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests
def load_url(url):
return requests.get(url)
if __name__ == '__main__':
url = 'https://www.python.org/'
executor = ProcessPoolExecutor(max_workers=4) # ThreadPoolExecutor(max_workers=4)
future = executor.submit(load_url, url)
print(future)
while 1:
if future.done():
print('status code: {}'.format(future.result().status_code))
break
Execution result:
<Future at 0x10ae058d0 state=running>
status code: 200
A simple http request. Note that when using ProcessPoolExecutor </ code>, the
\ _ \ _ main__ </ code> module is required, so do not run it in a REPL environment.
The __main__ module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter.
The submit </ code> method can only execute one task, so if you want to execute multiple tasks,
map </ code>,
as_completed </ code> and
wait < Use / code>.
The map </ code> method takes an execution function and sequence as arguments and returns a generator of execution results.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests
URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']
def load_url(url):
return requests.get(url)
if __name__ == '__main__':
# with ThreadPoolExecutor(max_workers=4) as executor:
with ProcessPoolExecutor(max_workers=4) as executor:
for url, data in zip(URLS, executor.map(load_url, URLS)):
print('{} - status_code {}'.format(url, data.status_code))
Execution result:
https://google.com - status_code 200
https://www.python.org/ - status_code 200
https://api.github.com/ - status_code 200
The as_completed </ code> method returns a generator for the
Future </ code> object. And it blocks when the task is not completed.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import requests
URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']
def load_url(url):
return url, requests.get(url).status_code
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
tasks = [executor.submit(load_url, url) for url in URLS]
for future in as_completed(tasks):
print(*future.result())
Execution result:
https://google.com 200
https://www.python.org/ 200
https://api.github.com/ 200
The wait </ code> method blocks the main thread and main process. Three conditions can be set with the argument
return_when </ code>.
conditions | Description |
---|---|
ALL_COMPLETED | Release blocking when all tasks are complete |
FIRST_COMPLETED | Release blocking when any task is complete |
FIRST_EXCEPTION | Release blocking if any task causes an error |
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, ALL_COMPLETED
import requests
URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']
def load_url(url):
requests.get(url)
print(url)
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
tasks = [executor.submit(load_url, url) for url in URLS]
wait(tasks, return_when=ALL_COMPLETED)
print('all completed.') #After 3 prints the main process is freed to print
Execution result:
https://www.python.org/
https://api.github.com/
https://google.com
all completed.
Concurrency threading --- thread-based parallelism multiprocessing --- Process-based parallel processing subprocess --- Subprocess Management concurrent.futures --Concurrent task execution
Recommended Posts