Die wichtigsten modernen Betriebssysteme sind Mac OS, UNIX, Linux und Windows. Diese Betriebssysteme unterstützen die Funktion "Multitasking".
Was ist Multitasking? Sie könnten beispielsweise denken, dass in einer Situation, in der Sie einen Browser starten, Musik hören und einen Bericht in Word schreiben, mindestens drei Aufgaben gleichzeitig ausgeführt werden. Und neben den Aufgaben auf der Vorderseite laufen heimlich verschiedene Aufgaben im Zusammenhang mit dem Betriebssystem hinter den Kulissen.
Es ist leicht zu verstehen, dass eine Multi-Core-CPU Multitasking verarbeiten kann, eine Single-Core-CPU jedoch auch Multitasking. Das Betriebssystem führt abwechselnd jede Aufgabe aus. Beispielsweise beträgt Aufgabe 1 0,01 Sekunden, Aufgabe 2 0,01 Sekunden, Aufgabe 3 0,01 Sekunden, Aufgabe 1 0,01 Sekunden usw. Die CPU ist schnell und fühlt sich fast gleichzeitig an. Diese alternative Ausführung wird häufig als ["Concurrent Computing"] bezeichnet (https://ja.wikipedia.org/wiki/%E4%B8%A6%E8%A1%8C%E8%A8%88%E7 % AE% 97).
Natürlich werden Single-Core-CPUs nacheinander ausgeführt, so dass im wahrsten Sinne des Wortes ein gleichzeitiger Fortschritt nur für Multi-Core-CPUs möglich ist. Die Verarbeitung mehrerer Aufgaben auf jedem Kern gleichzeitig mit einer Mehrkern-CPU ist "[Parallel Computing](https://ja.wikipedia.org/wiki/%E4%B8%A6%E5%] 88% 97% E8% A8% 88% E7% AE% 97) ". In den meisten Fällen übersteigt die Anzahl der ausgeführten Aufgaben die Anzahl der Kerne bei weitem, sodass die Arbeit zur "Schichtausführung" auch im Mehrkern ausgeführt wird.
Für das Betriebssystem ist eine Aufgabe ein Prozess. Wenn Sie beispielsweise einen Browser starten, wird ein einzelner Browserprozess erstellt. Ebenso wird beim Öffnen von Word ein Word-Prozess erstellt.
Ein Prozess ist nicht unbedingt ein Prozess. Beispielsweise führt Word viele Verarbeitungsschritte durch, z. B. die Überwachung von Benutzereingaben, die Rechtschreibprüfung und die Anzeige der Benutzeroberfläche. Diese "Unteraufgaben" werden als Threads bezeichnet. Es gibt mindestens einen Thread pro Prozess. Wenn mehrere Threads vorhanden sind, wechseln sie sich wie Prozesse ab.
Es gibt zwei Möglichkeiten, Multitasking in Python gleichzeitig zu verarbeiten.
Natürlich können Sie mehrere Threads in mehreren Prozessen haben, dies wird jedoch aufgrund der Komplexität des Modells nicht empfohlen.
Bei der Verarbeitung von Mehrfachaufgaben kann die Kommunikation und Zusammenarbeit zwischen Aufgaben erforderlich sein, Aufgabe 1 muss möglicherweise angehalten werden, wenn Aufgabe 2 ausgeführt wird, oder Aufgabe 3 und Aufgabe 4 können möglicherweise nicht gleichzeitig fortfahren. Das Programm wird etwas kompliziert.
(Quelle: Überblick über die Vorlesung zur Systemsoftware)
Funktion | Erläuterung |
---|---|
start() | Starten Sie einen Thread |
setName() | Gib dem Thread einen Namen |
getName() | Holen Sie sich den Namen des Threads |
setDaemon(True) | FadenDaemonZu |
join() | Warten Sie, bis der Thread die Verarbeitung abgeschlossen hat |
run() | Führen Sie die Thread-Verarbeitung manuell aus |
Python-Threads werden nicht prozesssimuliert, sondern sind echte POSIX-Threads 83% E3% 83% 89). In der Standardbibliothek können Sie zwei Module verwenden: _thread </ code> und
threading </ code>. Und
_thread </ code> ist ein Modul auf niedriger Ebene, und
threading </ code> ist ein Modul, das es kapselt. Daher verwende ich normalerweise
Threading </ code>.
Sie können einen Thread starten, indem Sie eine Funktion usw. einführen, um eine Instanz von Thread </ code> zu erstellen und mit
start </ code> zu starten.
import threading
import time
def run(n):
# threading.current_thread().Name ist getName()Anruf
print("task: {} (thread name: {})".format(n, threading.current_thread().name))
time.sleep(1)
print('2s')
time.sleep(1)
print('1s')
time.sleep(1)
print('0s')
time.sleep(1)
t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",), name='Thread T2') #Hier setName()Wird genannt
# start()
t1.start()
t2.start()
# join()
t1.join()
t2.join()
# join()Weil ich angerufen habe
#Der Haupt-Thread wartet, bis der obige Thread abgeschlossen ist
#Drucken, wenn alles erledigt ist
print(threading.current_thread().name)
Ausführungsergebnis:
task: t1 (thread name: Thread-1)
task: t2 (thread name: Thread T2)
2s
2s
1s
1s
0s
0s
MainThread
Sie können sehen, dass t1 und t2 abwechselnd ausgeführt werden. Eine der Wechselregeln wird in GIL nach der E / A-Operation (wo die Operation print </ code> gilt) ausführlicher erläutert.
Es ist auch möglich, Thread </ code> zu erben und die
run </ code> -Methode der Thread-Klasse anzupassen.
import threading
import time
class MyThread(threading.Thread):
def __init__(self, n):
super(MyThread, self).__init__()
self.n = n
# run()Umschreiben
def run(self):
print("task: {}".format(self.n))
time.sleep(1)
print('2s')
time.sleep(1)
print('1s')
time.sleep(1)
print('0s')
time.sleep(1)
t1 = MyThread("t1")
t2 = MyThread("t2")
t1.start()
t2.start()
Ausführungsergebnis:
task: t1
task: t2
2s
2s
1s
1s
0s
0s
Sie können die Anzahl der aktiven Threads mit active_count </ code> zählen. In einer REPL-Umgebung müssen jedoch mehrere Threads überwacht werden, sodass die Anzahl der Threads höher als erwartet ist.
Führen Sie den folgenden Code in einem Skript aus.
import threading
import time
def run(n):
print("task: {}".format(n))
time.sleep(1)
for i in range(3):
t = threading.Thread(target=run, args=("t{}".format(i),))
t.start()
time.sleep(0.5)
print(threading.active_count())
Ausführungsergebnis:
task: t0
task: t1
task: t2
4
Wenn der Hauptthread print </ code> ausgeführt wird, ist die Anzahl der Threads = 3 + 1 (Hauptthread), da andere Threads noch ausgeführt werden.
import threading
import time
def run(n):
print("task: {}".format(n))
time.sleep(0.5)
for i in range(3):
t = threading.Thread(target=run, args=("t{}".format(i),))
t.start()
time.sleep(1)
print(threading.active_count())
Ausführungsergebnis:
task: t0
task: t1
task: t2
1
Durch Anpassen der Ausführungszeit und Verzögern des -Drucks </ code> des Hauptthreads wird die Anzahl der aktiven Threads nur für den Hauptthread zu 1.
Starten Sie den Thread als Daemon.
import threading
import time
def run(n):
print("task: {}".format(n))
time.sleep(1)
print('3')
time.sleep(1)
print('2')
time.sleep(1)
print('1')
for i in range(3):
t = threading.Thread(target=run, args=("t{}".format(i),))
# setDaemon(True)
t.setDaemon(True)
t.start()
time.sleep(1.5)
print('Anzahl der Themen: {}'.format(threading.active_count()))
Ausführungsergebnis:
task: t0
task: t1
task: t2
3
3
3
Anzahl der Themen: 4
Da t1, t2 und t3 als Daemon-Thread des Haupt-Threads festgelegt sind, werden sie gestoppt, wenn der Haupt-Thread endet. Beispielsweise ist die Rechtschreibprüfung von Word ein Daemon-Thread, der in einer Endlosschleife ausgeführt wird. Wenn der Haupt-Thread jedoch ausfällt, wird er ausgefallen. 1-5. GIL Bei Verwendung einer Multi-Core-CPU in anderen Programmiersprachen können Threads mit der Anzahl der Kerne gleichzeitig ausgeführt werden. In Python wird jedoch jeweils nur ein Thread ausgeführt. Dies ist ein Prozess. Mit anderen Worten, Python-Multithreading ist vollständig parallel. Der Grund ist [GIL (Global Interpreter Lock)](https://ja.wikipedia.org/wiki/%E3%82%B0%E3%83%AD%E3%83%BC%E3%83%90%E3 % 83% AB% E3% 82% A4% E3% 83% B3% E3% 82% BF% E3% 83% 97% E3% 83% AA% E3% 82% BF% E3% 83% AD% E3% 83 Es befindet sich bei% 83% E3% 82% AF).
GIL ist eine Art exklusive Kontrolle (wird später erklärt). Als Python zum ersten Mal entwickelt wurde, haben wir GIL implementiert, um die Kombination mit Datensicherheits- und C-Sprachbibliotheken zu vereinfachen. Wenn Sie einen Thread ausführen, benötigen Sie eine GIL. In einem Python-Interpreter gibt es nur einen Python-Prozess. Und da es in einem Python-Prozess nur eine GIL gibt, kann jeweils nur ein Thread ausgeführt werden. GIL ist wie ein Pass, und Threads ohne GIL können nicht in die CPU gelangen. GIL ist übrigens in CPython (normale Python-Distribution), aber nicht in PyPy und Jython. Eine andere bekannte Sprache mit GIL ist Ruby.
sys.setcheckinterval </ code> festlegen
Lassen Sie uns als Experiment eine einfache Endlosschleife ausführen.
import threading
import multiprocessing
def loop():
x = 0
while True:
x = x ^ 1
for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()
Wie Sie sehen können, liegt die CPU-Auslastung aufgrund von GIL in einem einzigen Prozess nur bei etwa 100%, unabhängig davon, wie sehr Sie es versuchen (auf einer Quad-Core-CPU sollten bis zu 400% verfügbar sein).
Ressourcen werden von Threads im selben Prozess gemeinsam genutzt. Und weil die Threads zufällig und nicht in der richtigen Reihenfolge gewechselt werden, können die Daten durcheinander geraten.
import threading
#Geld sparen
balance = 0
def change_it(n):
#Auszahlung und Einzahlung sollten 0 sein
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(100000):
change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
Wie Sie sehen können, indem Sie den obigen Code mehrmals ausführen, ist das Ergebnis ungleich Null.
balance = balance + n </ code> kann in zwei atomare Operationen aufgeteilt werden.
x = balance + n
balance = x
Das x </ code> ist hier eine lokale Variable, und jeder Thread hat seinen eigenen
x </ code>. Wenn der obige Code der Reihe nach ausgeführt wird, wird er wie folgt.
balance = 0 #Ursprünglicher Wert
t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0
t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8
t2: x2 = balance - 8 # x2 = 8 - 8 = 0
t2: balance = x2 # balance = 0
balance = 0 #Das Ergebnis ist korrekt
Wenn die Reihenfolge jedoch unterschiedlich ist, ist das Ergebnis unterschiedlich.
balance = 0 #Ursprünglicher Wert
t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8
t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0
t2: x2 = balance - 8 # x2 = 0 - 8 = -8
t2: balance = x2 # balance = -8
balance = -8 #Das Ergebnis ist falsch
Dieses Phänomen der unvorhersehbaren Berechnung führt zu Multithreading und wird als Thread-unsicher bezeichnet.
Um dies zu lösen, müssen Sie den Thread sperren und steuern.
import threading
#Geld sparen
balance = 0
def change_it(n):
#Holen Sie sich das Schloss
lock.acquire()
global balance
balance = balance + n
balance = balance - n
#Lassen Sie das Schloss los
lock.release()
def run_thread(n):
for i in range(100000):
change_it(n)
lock = threading.Lock() #Instanziieren Sie eine Sperre
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
Durch die Verwendung der exklusiven Steuerung kann kein anderer Thread auf die Ressource zugreifen, bis die Sperre aufgehoben wird. Auf diese Weise ist das Berechnungsergebnis immer 0.
Exklusive Steuerung, mit der verschachtelte Sperren rekursiv freigegeben werden können.
import threading
#Geld sparen
balance = 0
def add_it(n):
lock.acquire()
global balance
balance = balance + n
return balance
def sub_it(n):
lock.acquire()
global balance
balance = balance - n
return balance
def change_it(n):
#Holen Sie sich das Schloss
lock.acquire()
global balance
balance = add_it(n)
balance = sub_it(n)
#Sperre rekursiv aufheben
lock.release()
def run_thread(n):
for i in range(1000):
change_it(n)
lock = threading.RLock() #Instanziieren Sie eine Sperre
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
Hier wird die Sperre auch in add_it </ code> und
sub_it </ code> erworben. Durch die Verwendung der rekursiven exklusiven Steuerung ist es nicht erforderlich, jede Sperre aufzuheben, und alle können auf einmal freigegeben werden. Es ist jedoch sehr rechenintensiv, sodass wir die Anzahl der Schleifen reduzieren.
Mit der exklusiven Kontrolle können Ressourcen zu einem bestimmten Zeitpunkt nur von einem Thread verarbeitet werden, während Semapho eine Grenze darstellt, die die gleichzeitige Verarbeitung einer bestimmten Anzahl von Threads ermöglicht. Zum Beispiel ist eine Situation, in der drei Toilettensitze in der Toilette sind, drei Personen sie gleichzeitig benutzen und andere in der Schlange stehen, ein Semapho.
import threading
import time
def run(n):
semaphore.acquire()
time.sleep(1)
print("current thread: {}\n".format(n))
semaphore.release()
semaphore = threading.BoundedSemaphore(5) #Ermöglicht die gleichzeitige Verarbeitung von 5 Threads
for i in range(22):
t = threading.Thread(target=run, args=("t-{}".format(i),))
t.start()
while threading.active_count() != 1:
pass # print threading.active_count()
else:
print('-----Alle Threads sind beendet-----')
Wenn Sie den obigen Code ausführen, können Sie sehen, dass die aktuellen Thread-Zeichenfolgen fünf mal fünf ausgegeben werden.
Thread-Ereignisse dienen dazu, dass der Haupt-Thread andere Threads steuert. Die folgenden Methoden werden für Ereignis </ code> bereitgestellt.
Methode | Erläuterung |
---|---|
clear | Setzen Sie das Flag auf False |
set | Setzen Sie das Flag auf True |
is_set | Gibt True zurück, wenn das Flag True ist |
wait | Überwachen Sie die Flagge weiter und blockieren Sie sie, wenn die Flagge falsch ist |
import threading
import time
event = threading.Event()
def lighter():
'''
flag=True:Grünes Licht
flag=False:Rotlicht
'''
count = 0
event.set() #Der Anfangswert ist grünes Licht
while True:
if 5 < count <= 10:
event.clear() #Mach ein rotes Licht
print("\33[41;1m rotes Licht...\033[0m")
elif count > 10:
event.set() #Machen Sie grünes Licht
count = 0
else:
print("\33[42;1m grünes Licht...\033[0m")
time.sleep(1)
count += 1
def car(name):
while True:
if event.is_set(): #Überprüfen Sie, ob das grüne Licht leuchtet
print("[{}]Voraus...".format(name))
time.sleep(1)
else:
print("[{}]Warten Sie wegen des roten Lichts auf das Signal...".format(name))
event.wait()
# flag=Blockiere hier bis True
print("[{}]Bewegen Sie sich aufgrund von grünem Licht vorwärts...".format(name))
light = threading.Thread(target=lighter,)
light.start()
car = threading.Thread(target=car, args=("MINI",))
car.start()
Mit dem obigen Code haben wir auf der Veranstaltung eine einfache Kommunikation zwischen der Ampel und dem Faden des Autos realisiert.
Sie können auch einen Timer verwenden, um den Thread nach Zeit zu steuern.
from threading import Timer
def hello():
print("hello, world")
t = Timer(1, hello)
t.start() #Hallo wird nach 1 Sekunde ausgeführt
Es gibt auch eine Methode, um den Thread durch Beurteilen des Zustands zu steuern. Die folgenden Methoden werden unter Bedingung </ code> bereitgestellt.
Methode | Erläuterung |
---|---|
wait | Hängen Sie den Thread, bis er benachrichtigt wird oder wenn die Zeitüberschreitung des Arguments erreicht ist |
notify | Hung Thread (Standard n=Benachrichtigen 1); kann nur mit dem erworbenen Schloss verwendet werden |
notifyAll | Benachrichtigen Sie alle hängenden Threads |
import threading
import time
from random import randint
from collections import deque
class Producer(threading.Thread):
def run(self):
global stocks
while True:
if lock_con.acquire():
products = [randint(0, 100) for _ in range(5)]
stocks.extend(products)
print('Produzent{}Ist{}Produziert.'.format(self.name, stocks))
lock_con.notify()
lock_con.release()
time.sleep(3)
class Consumer(threading.Thread):
def run(self):
global stocks
while True:
lock_con.acquire()
if len(stocks) == 0:
#Warten Sie, bis es produziert wird, wenn das Produkt leer ist
#Hängen Sie den Thread, bis er benachrichtigt wird
lock_con.wait()
print('Kunde{}Ist{}Gekauft. Lager: {}'.format(self.name, stocks.popleft(), stocks))
lock_con.release()
time.sleep(0.5)
stocks = deque()
lock_con = threading.Condition()
p = Producer()
c = Consumer()
p.start()
c.start()
Ausführungsergebnis:
Produzententhread-1 ist deque([73, 2, 93, 52, 21])Produziert.
Kunden-Thread-2 kaufte 73. Lager: deque([2, 93, 52, 21])
Kunden-Thread-2 gekauft 2. Lager: deque([93, 52, 21])
Kunden-Thread-2 kaufte 93. Lager: deque([52, 21])
Kunden-Thread-2 kaufte 52. Lager: deque([21])
Kunden-Thread-2 kaufte 21. Lager: deque([])
Produzententhread-1 ist deque([6, 42, 85, 56, 76])Produziert.
Kunden-Thread-2 kaufte 6. Lager: deque([42, 85, 56, 76])
Kunden-Thread-2 kaufte 42. Lager: deque([85, 56, 76])
Kunden-Thread-2 kaufte 85. Lager: deque([56, 76])
Kunden-Thread-2 kaufte 56. Lager: deque([76])
Kunden-Thread-2 kaufte 76. Lager: deque([])
Es ist ein einfaches Programm, bei dem der Hersteller 5 Produkte produziert, wenn der Kunde den gesamten Bestand kauft.
Es ist eine Steuerung, die gemeinsam ausgeführt wird, wenn die angegebene Anzahl von Threads die Barriere passiert. In einem Online-Spiel können Sie beispielsweise eine Barriere implementieren, die eine bestimmte Zeit wartet, bis das Team eine bestimmte Anzahl von Personen erreicht. Die folgenden Methoden werden in Barrier </ code> bereitgestellt.
Methode | Erläuterung |
---|---|
wait | Threads passieren die Barriere. Nachdem die angegebene Anzahl von Threads passiert ist, werden alle wartenden Threads freigegeben |
reset | Leeren Sie die Barriere und geben Sie BrokenBarrierError an den wartenden Thread zurück |
abort | Brach die Barriere in einen gebrochenen Zustand, alle aktuellen Threads wurden beendet und BrokenBarrierError an Threads zurückgegeben, die danach versuchen, die Barriere zu passieren |
import threading
num = 4
def start():
print('{}Seit ich eine Person bin, hat das Spiel begonnen.'.format(num))
lock = threading.Lock()
barrier = threading.Barrier(num, action=start)
class Player(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def run(self):
try:
if not barrier.broken:
print('{}Teilgenommen.'.format(self.name))
barrier.wait(2)
except threading.BrokenBarrierError:
print('Weil das Spiel nicht gestartet werden kann{}Hat verlassen.'.format(self.name))
players = []
for i in range(10):
lock = threading.Lock()
p = Player(name='Player {}'.format(i))
players.append(p)
for p in players:
p.start()
Ausführungsergebnis
Spieler 0 hat teilgenommen.
Spieler 1 hat teilgenommen.
Spieler 2 hat teilgenommen.
Spieler 3 hat teilgenommen.
Das Spiel begann, weil es 4 Personen gab.
Spieler 4 hat teilgenommen.
Spieler 5 hat teilgenommen.
Spieler 6 hat teilgenommen.
Spieler 7 hat teilgenommen.
Das Spiel begann, weil es 4 Personen gab.
Spieler 8 hat teilgenommen.
Spieler 9 hat teilgenommen.
Spieler 8 ist gegangen, weil das Spiel nicht gestartet werden kann.
Spieler 9 ist gegangen, weil das Spiel nicht gestartet werden kann.
Threads werden zufällig ausgeführt, sodass sie nicht immer in der oben angegebenen Reihenfolge ausgegeben werden. Hier mussten die Teams von Spieler 8 und Spieler 9 (Barrieren) das Team verlassen (BrokenBarrierError), da sie die angegebene Anzahl nicht rechtzeitig erreichten.
1-7. ThreadLocal Ich habe erklärt, dass Sie die Daten zwischen Threads gemeinsam nutzen müssen, um sie zu berechnen und die genaue Ausgabe zu berechnen. Es gibt jedoch Situationen, in denen jeder Thread seine eigenen lokalen Variablen verarbeiten soll.
import threading
#Erstellen Sie ein ThreadLocal-Objekt im globalen Bereich
local_school = threading.local()
def process_student():
#Verdiene dir Schüler, die sich auf den aktuellen Thread beziehen
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
#Binden Sie den Namen an den Schüler in ThreadLocal
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
Ausführungsergebnis:
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
Die local_school </ code> ist hier eine globale Variable. Da es sich jedoch um ein
ThreadLocal </ code> -Objekt handelt, kann die Instanzvariable
student </ code> festgelegt werden, ohne dass sich die einzelnen Threads gegenseitig beeinflussen. Sie können es bedienen. Sie können
local_school </ code> als Wörterbuch betrachten und
Lehrer </ code> sowie
Schüler </ code> binden. Und jeder Thread kann beliebig arbeiten und beeinflusst sich nicht gegenseitig. Bei Verwendung von
ThreadLocal </ code> können Sie für jeden Thread eine eigene DB-Verbindung, http-Anforderung usw. erstellen. Aus der Sicht eines Threads sind alle empfangenen Daten wie eine lokale Variable und können von jedem anderen Thread bearbeitet werden.
fork () </ code> erstellen. Durch Aufrufen von fork () </ code> wird der aktuelle Prozess kopiert. Der kopierte Prozess wird als untergeordneter Prozess bezeichnet, und der ursprüngliche Prozess wird zum übergeordneten Prozess. Der Rückgabewert von fork () </ code> wird sowohl an den untergeordneten als auch an den übergeordneten Prozess zurückgegeben. Der Rückgabewert des untergeordneten Prozesses ist 0, und die ID des untergeordneten Prozesses wird im übergeordneten Prozess zurückgegeben. Der Grund ist, dass der übergeordnete Prozess die ID des untergeordneten Prozesses aufzeichnen muss. Sie können die ID des übergeordneten Prozesses vom untergeordneten Prozess mit getppid </ code> abrufen.
Das Python OS </ code> -Modul kapselt das Systemaufrufsystem.
import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
Ausführungsergebnis:
Process (19148) start...
I (19148) just created a child process (19149).
I am child process (19149) and my parent is 19148.
Hier geben der übergeordnete Prozess und der untergeordnete Prozess unterschiedliche bedingte Zweige ein. Bitte beachten Sie, dass Windows keinen Systemaufruf fork () </ code> hat und nicht ausgeführt werden kann.
Mit fork () </ code> kann ein neuer Prozess erstellt und verarbeitet werden, wenn ein Prozess eine neue Aufgabe übernimmt. Mit dem bekannten Apache-Server kann der übergeordnete Prozess beispielsweise den Port und
fork () </ code> überwachen, damit der untergeordnete Prozess neue http-Anforderungen verarbeiten kann.
Beim Schreiben von Python-Multiprozessprogrammen wird empfohlen, das Modul multiprocessing </ code> der Standardbibliothek zu verwenden. Das
Multiprocessing </ code> -Modul ist ein Modul, das parallel verarbeitet werden kann. Es wird auch gesagt, dass das Modul
Multiprocessing </ code> implementiert wurde, da das Modul
threading </ code> aufgrund von GIL nicht parallel verarbeitet werden kann.
Das Multiprocessing </ code> -Modul ist auch plattformübergreifend, sodass Sie Multiprozessprogramme unter Windows erstellen können. Wie oben erwähnt, verfügt Windows nicht über
fork () </ code>. Wenn Sie also einen Prozess mit dem
Multiprocessing </ code> -Modul erstellen, wird eine Pseudo
fork () </ code> erstellt Ich verarbeite. Die Möglichkeit hierfür besteht darin, alle Python-Objekte im übergeordneten Prozess mit
Pickle </ code> zu serialisieren und an den untergeordneten Prozess zu übergeben. Wenn der Aufruf des
Multiprocessing </ code> -Moduls unter Windows fehlschlägt, ist die
Pickle </ code> möglicherweise fehlgeschlagen.
Wenn Sie einen untergeordneten Prozess erstellen und einen externen Befehl ausführen möchten, können Sie den Unterprozess der Standardbibliothek </ code> verwenden. Hier wird jedoch zunächst die Python-Verarbeitung im Multiprozessmodul
multiprocessing </ code> ausgeführt. Ich werde die Funktion vorstellen.
Sie können untergeordnete Prozesse einfach mithilfe von Prozessen erstellen.
from multiprocessing import Process
import os
#Was der untergeordnete Prozess tut
def run_proc(name):
print('Run child process {} ({})...'.format(name, os.getpid()))
print('Parent process {}.'.format(os.getpid()))
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
Ausführungsergebnis:
Parent process 19218.
Child process will start.
Run child process test (19219)...
Child process end.
Übergeben Sie die Ausführungsfunktion und die Argumente an Process </ code>, erstellen Sie eine Instanz und starten Sie sie mit
start </ code>. Sie können ganz einfach einen untergeordneten Prozess aus
fork () </ code> erstellen. Wenn Sie hier
join </ code> verwenden, wartet der übergeordnete Prozess, bis die Ausführung des untergeordneten Prozesses abgeschlossen ist, genau wie wenn es sich um einen Thread handelt.
Das Erstellen eines untergeordneten Prozesses ist sehr rechenintensiv. Wenn Sie also eine große Anzahl von Prozessen erstellen möchten, ist es effizienter, einen Prozesspool mit Pool </ code> zu erstellen. Die Hauptmethoden von
Pool </ code> sind wie folgt.
Methode | Erläuterung |
---|---|
apply | Synchrone Verarbeitung |
apply_async | Asynchrone Verarbeitung |
terminate | Beenden Sie sofort |
join | Der übergeordnete Prozess wartet darauf, dass der untergeordnete Prozess die Verarbeitung beendet. Die Prozessverknüpfung kann erst nach dem Schließen oder Beenden erfolgen |
close | Beenden Sie, wenn alle Prozesse abgeschlossen sind |
from multiprocessing import Pool
import os
import time
import random
def long_time_task(name):
print('Run task {} ({})...'.format(name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task {} runs {} seconds.'.format(name, (end - start)))
print('Parent process {}.'.format(os.getpid()))
p = Pool(4) #Bis zu 4 untergeordnete Prozesse gleichzeitig
for i in range(5):
p.apply_async(long_time_task, args=(i,))
#Aufgrund der asynchronen Verarbeitung muss der übergeordnete Prozess nicht auf die Verarbeitung des untergeordneten Prozesses warten.
#Mach den nächsten Druck
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
Ausführungsergebnis:
Parent process 19348.
Waiting for all subprocesses done...
Run task 0 (19349)...
Run task 1 (19350)...
Run task 2 (19351)...
Run task 3 (19352)...
Task 1 runs 0.8950300216674805 seconds.
Run task 4 (19350)...
Task 2 runs 1.0132842063903809 seconds.
Task 4 runs 0.3936619758605957 seconds.
Task 3 runs 2.3689510822296143 seconds.
Task 0 runs 2.776203155517578 seconds.
All subprocesses done.
Da die Poolgröße 4 beträgt, wird Task 4 ausgeführt, nachdem Task 0 bis Task 3 abgeschlossen sind.
Im Gegensatz zu Threads werden Daten nicht zwischen Prozessen geteilt. Das Betriebssystem bietet viele Methoden der prozessübergreifenden Kommunikation. Multiprocessing </ code> kapselt Funktionen des Betriebssystems auf niedriger Ebene, um die Verwendung zu vereinfachen.
FIFO-Datenstrukturwarteschlangen werden häufig für die Kommunikation zwischen Prozessen verwendet.
from multiprocessing import Process, Queue
import os
import time
import random
#Daten in die Warteschlange schreiben
def write(q):
print('Process to write: {}'.format(os.getpid()))
for value in ['A', 'B', 'C']:
print('Put {} to queue...'.format(value))
q.put(value)
time.sleep(random.random())
#Daten aus der Warteschlange lesen
def read(q):
print('Process to read: {}'.format(os.getpid()))
while True:
value = q.get(True)
print('Get {} from queue.'.format(value))
#Der übergeordnete Prozess erstellt eine Warteschlange und übergibt sie an den untergeordneten Prozess
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
#Starten Sie pw und beginnen Sie zu schreiben
pw.start()
#Starten Sie pr und beginnen Sie zu lesen
pr.start()
#Warten Sie, bis pw fertig ist
pw.join()
#pr ist eine Endlosschleife, also töte
pr.terminate()
Ausführungsergebnis:
Process to write: 19489
Put A to queue...
Process to read: 19490
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
Selbst wenn der Messwert langsam ist, kann er aufgrund des FIFO in der richtigen Reihenfolge abgerufen werden.
Wie der Name schon sagt, können Pipes als rohrförmige Datenstrukturen betrachtet werden. Daten werden übertragen, indem Daten auf einer Seite der Pipe abgelegt werden ( send </ code> -Methode) und Daten auf der anderen Seite empfangen werden (
recv </ code> -Methode). Bitte beachten Sie, dass Daten beschädigt werden können, wenn zwei Prozesse gleichzeitig Daten desselben Typs speichern oder empfangen.
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()
Ausführungsergebnis:
[42, None, 'hello']
Ich erklärte, dass die Daten zwischen Prozessen nicht geteilt werden, aber es ist tatsächlich eine Lüge ...
In Abhängigkeit vom Betriebssystem [gemeinsam genutzter Speicher] zwischen Prozessen (https://ja.wikipedia.org/wiki/%E5%85%B1%E6%9C%89%E3%83%A1%E3%83%A2% E3% 83% AA #% E3% 82% BD% E3% 83% 95% E3% 83% 88% E3% 82% A6% E3% 82% A7% E3% 82% A2% E3% 81% AB% E3 % 82% 88% E3% 82% 8B% E5% 85% B1% E6% 9C% 89% E3% 83% A1% E3% 83% A2% E3% 83% AA) können hergestellt werden. In Python können Sie mit Value </ code> und
Array </ code> numerische Daten und Array-Daten im gemeinsam genutzten Speicher speichern. Abgesehen davon verwenden
Value </ code> und
Array </ code> die Datenstruktur der C-Sprache so wie sie ist. Pythons Zahlen (die die Zahlenklasse erben) ist unveränderlich und kann nicht direkt umgeschrieben werden. ..
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
num = Value('d', 0.0) #doppelte Typennummer
arr = Array('i', range(10)) #Array
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
Ausführungsergebnis:
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
Es kann genauer sein zu sagen, dass der Manager die Daten teilt, anstatt sie zu übermitteln. Manager () </ code> gibt ein Managerobjekt zurück und erstellt einen Serverprozess. Über den Serverprozess können andere Prozesse auf Proxy-Weise mit Python-Objekten arbeiten. Manager-Objekte unterstützen Python-Objekte
list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array </ code>.
from multiprocessing import Process, Manager
def f(d, l, i):
d[i] = i
d[str(i)] = str(i)
l.append(i)
print(l)
with Manager() as manager:
shared_dict = manager.dict()
shared_list = manager.list()
p_list = []
#Erstellen Sie 10 Prozesse
for i in range(10):
p = Process(target=f, args=(shared_dict, shared_list, i))
p.start()
p_list.append(p)
for p in p_list:
p.join()
print('All subprocesses done.')
print(shared_dict)
print(shared_list)
Ausführungsergebnis:
[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4, 5]
[0, 1, 2, 3, 4, 5, 6]
[0, 1, 2, 3, 4, 5, 6, 8]
[0, 1, 2, 3, 4, 5, 6, 8, 7]
[0, 1, 2, 3, 4, 5, 6, 8, 7, 9]
All subprocesses done.
{0: 0, '0': '0', 1: 1, '1': '1', 2: 2, '2': '2', 3: 3, '3': '3', 4: 4, '4': '4', 5: 5, '5': '5', 6: 6, '6': '6', 8: 8, '8': '8', 7: 7, '7': '7', 9: 9, '9': '9'}
[0, 1, 2, 3, 4, 5, 6, 8, 7, 9]
Ich habe versucht, eine Liste und ein Wörterbuch für die Freigabe zwischen Prozessen im Manager zu erstellen. Hier sehen Sie, dass die Prozesse nicht aufeinander folgen.
Prozesse haben wie Threads Sperren.
from multiprocessing import Process, Lock
def f(i):
lock.acquire()
try:
print('hello world', i)
finally:
lock.release()
lock = Lock()
for num in range(10):
Process(target=f, args=(num,)).start()
Ausführungsergebnis:
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9
Aufgrund der Sperre werden die Zahlen im Gegensatz zur vorherigen Zeit in der angegebenen Reihenfolge ausgegeben. Sie können jedoch die Leistung von Multiprozessen nicht demonstrieren.
Python-Prozesse können auf mehreren Computern verarbeitet werden. Das Submodul manager </ code> des Moduls
multiprocessing </ code> kann Prozesse auf mehrere Maschinen verteilen. Auch wenn Sie das Kommunikationsprotokoll nicht kennen, können Sie ein Programm für die verteilte Prozessverarbeitung schreiben.
Die verteilte Prozessverarbeitung erfordert einen Serverprozess, der Aufgaben verteilt, und einen Arbeitsprozess, der die Aufgaben tatsächlich verarbeitet. Implementieren Sie zunächst den Serverprozess task_master.py </ code>.
Hier veröffentlicht manager </ code> die Warteschlange als ** api ** im Internet. Sobald der Serverprozess die Warteschlange gestartet und die Aufgabe eingegeben hat, kann von anderen Computern aus auf sie zugegriffen werden.
task_master.py
import random
import queue #Da es über das Netz erfolgt, ist die Standardbibliothekswarteschlange ausreichend
from multiprocessing.managers import BaseManager
#Warteschlange zum Senden von Aufgaben
task_queue = queue.Queue()
#Warteschlange, um Ergebnisse zu erhalten
result_queue = queue.Queue()
class QueueManager(BaseManager):
pass
#Registrieren Sie zwei Warteschlangen als API
#Im Falle von Windows kann Lambda für die API-Registrierung verwendet werden. Bitte definieren Sie die Funktion gehorsam
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
#Verwenden Sie Port 5000 für die Authentifizierungsverschlüsselung'abc'Zu
#Für Windows muss die Adresse angegeben werden (127).0.0.1)
manager = QueueManager(address=('', 5000), authkey=b'abc')
#anfangen
manager.start()
#Warteschlangenobjekt über Netz abrufen
task = manager.get_task_queue()
result = manager.get_result_queue()
#Versuchen Sie, eine Aufgabe zu erledigen
for i in range(10):
n = random.randint(0, 10000)
print('Put task {}...'.format(n))
task.put(n)
#Empfangen Sie das Ergebnis aus der Ergebniswarteschlange
print('Try get results...')
for i in range(10):
#Wenn es 10 Sekunden überschreitet, endet es mit einer Zeitüberschreitung
r = result.get(timeout=10)
print('Result: {}'.format(r))
#Ende
manager.shutdown()
print('master exit.')
Implementieren Sie als Nächstes die Datei task_worker.py </ code> für den Arbeitsprozess. Holen Sie sich die Aufgabe mit ** api ** namens
manager.get_task_queue </ code>, die oben veröffentlicht wurde, und verarbeiten Sie sie.
task_worker.py
import time
import queue
from multiprocessing.managers import BaseManager
#Erstellen Sie denselben Warteschlangenmanager
class QueueManager(BaseManager):
pass
#Holen Sie sich API aus dem Netz und registrieren Sie es in Queue Manager
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
#Stellen Sie eine Verbindung zum Server her
server_addr = '127.0.0.1'
print('Connect to server {}...'.format(server_addr))
#Stellen Sie den gleichen Port und die gleiche Authentifizierungsverschlüsselung ein
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
#Verbindung
m.connect()
#Holen Sie sich jede Warteschlange
task = m.get_task_queue()
result = m.get_result_queue()
#Empfangen Sie eine Aufgabe aus der Aufgabenwarteschlange
#Speichern Sie das Verarbeitungsergebnis in der Ergebniswarteschlange
for i in range(10):
try:
n = task.get(timeout=1)
#Hier ist die Aufgabe die einfache Quadratberechnung.
print('run task {} * {}...'.format(n, n))
r = '{} * {} = {}'.format(n, n, n*n)
time.sleep(1)
result.put(r)
except queue.Empty:
print('task queue is empty.')
#Ende
print('worker exit.')
Es kann auch auf einem lokalen Computer ausgeführt werden.
Ausführungsergebnis:
Zuerst legt der Serverprozess die Aufgabe zuerst in der task_queue </ code> ab. Wenn Sie alles eingegeben haben, warten Sie auf die Ergebnisse in der
result_queue </ code>.
task_master.py
Put task 7710...
Put task 6743...
Put task 8458...
Put task 2439...
Put task 1351...
Put task 9885...
Put task 5532...
Put task 4181...
Put task 6093...
Put task 3815...
Try get results...
Der Arbeitsprozess stellt dann eine Verbindung zum Server her, ruft die Aufgabe in der task_queue </ code> ab und verarbeitet sie. Das Verarbeitungsergebnis wird an
result_queue </ code> gesendet.
task_worker.py
Connect to server 127.0.0.1...
run task 7710 * 7710...
run task 6743 * 6743...
run task 8458 * 8458...
run task 2439 * 2439...
run task 1351 * 1351...
run task 9885 * 9885...
run task 5532 * 5532...
run task 4181 * 4181...
run task 6093 * 6093...
run task 3815 * 3815...
worker exit.
Wenn das Ergebnis in result_queue </ code> eingeht, gibt der Serverprozess es der Reihe nach aus.
task_master.py
Put task 7710...
Put task 6743...
Put task 8458...
Put task 2439...
Put task 1351...
Put task 9885...
Put task 5532...
Put task 4181...
Put task 6093...
Put task 3815...
Try get results...
Result: 7710 * 7710 = 59444100
Result: 6743 * 6743 = 45468049
Result: 8458 * 8458 = 71537764
Result: 2439 * 2439 = 5948721
Result: 1351 * 1351 = 1825201
Result: 9885 * 9885 = 97713225
Result: 5532 * 5532 = 30603024
Result: 4181 * 4181 = 17480761
Result: 6093 * 6093 = 37124649
Result: 3815 * 3815 = 14554225
master exit.
Alle Warteschlangen befinden sich im Serverprozess, da der Arbeitsprozess die Warteschlange nicht erstellt.
(Quelle: [Hiroyukimin-ähnliches Regierungsnetzwerk](https://www.liaoxuefeng.com/wiki/1016959663602400/1017631559645600))Auf diese Weise können verteilte Prozesse in Python realisiert werden. Leistungsstarke Rechenleistung kann durch Verarbeitung mit mehreren Mitarbeitern erzielt werden.
fork () </ code> erläutert, um eine Kopie des aktuellen Prozesses als untergeordneter Prozess zu erstellen. Das Aufrufen von os.fork </ code> in Python erstellt einen untergeordneten Prozess Ihres Python-Programms. Es gibt jedoch Situationen, in denen Sie einen untergeordneten Prozess benötigen, der externe Befehle anstelle eines Python-Programms ausführen kann.
Unter Unix-basierten Betriebssystemen gibt es einen weiteren Systemaufruf exec () </ code>. Es ist in Python als os.execve </ code> implementiert. exec () </ code> ist eine Funktion, die derzeit den Prozess durch ein anderes Programm ersetzt. Das heißt, os.fork </ code> erstellt einen untergeordneten Prozess eines Python-Programms, und os.execve </ code> verwendet andere Programme ( ls </ code>, ls </ code>, die in der Shell ausgeführt werden können. Es kann durch ein Programm wie code> ping </ code> ersetzt werden.
Der Unterprozess der Standardbibliothek </ code> ist ein Modul zum Erstellen untergeordneter Prozesse, die externe Programme ausführen. Wenn Sie dann ein externes Programm mit dem Unterprozess </ code> ausführen, erstellen Sie eine Pipe (Pipe) für die prozessübergreifende Kommunikation zwischen dem Python-Prozess und dem untergeordneten Prozess, übergeben Sie Parameter und senden Sie Rückgabewerte und Fehler. Sie können es empfangen.
3-1. subprocess.run
Ab Python 3.5 wird offiziell empfohlen, den Befehl in subprocess.run </ code> auszuführen. Hier wird die Erklärung wie
subprocess.call </ code> der alten ** api ** weggelassen.
subprocess.run(args, *, stdin=None, input=None,
stdout=None, stderr=None, shell=False, timeout=None, check=False, universal_newlines=False)
subprocess.run </ code> gibt eine Instanz der Klasse
CompletedProcess </ code> zurück. Die Attribute der Klasse
CompletedProcess </ code> lauten wie folgt.
Attribut | Erläuterung |
---|---|
args | An untergeordnete Prozesse übergebene Parameter, Zeichenfolge oder Liste |
returncode | Speichert den Statuscode nach der Ausführung |
stdout | Standardausgabe nach Ausführung |
stderr | Standardfehler nach der Ausführung |
check_returncode() | Löst CalledProcessError aus, wenn der Statuscode ungleich Null ist (Ausführungsfehler) |
Hier einige Beispiele für die Verwendung von subprocess.run </ code>.
Sie können die Standardausgabe mit dem Unterprozess abfangen.PIPE </ code> (andernfalls wird die Ausgabe verworfen).
import subprocess
# subprocess.run(["ls", "-l"] stdout=subprocess.PIPE)Gleich wie
obj = subprocess.run(["ls", "-l"], stdout=subprocess.PIPE)
print('stdout:\n{}'.format(obj.stdout.decode()))
Ausführungsergebnis:
stdout:
total 128
-rw-r--r--@ 1 kaito staff 692 Feb 16 19:35 1-1.py
-rw-r--r--@ 1 kaito staff 509 Feb 17 23:39 1-2.py
-rw-r--r--@ 1 kaito staff 364 Feb 19 16:48 2-10.py
-rw-r--r--@ 1 kaito staff 645 Feb 19 19:12 2-17.py
-rw-r--r--@ 1 kaito staff 213 Feb 19 19:14 2-18.py
-rw-r--r--@ 1 kaito staff 209 Feb 19 19:18 2-19.py
-rw-r--r--@ 1 kaito staff 318 Feb 19 23:53 2-20.py
-rw-r--r--@ 1 kaito staff 194 Feb 19 23:57 2-21.py
-rw-r--r--@ 1 kaito staff 230 Feb 20 15:46 2-23.py
-rw-r--r--@ 1 kaito staff 131 Feb 18 19:39 2-4.py
-rw-r--r--@ 1 kaito staff 543 Feb 18 19:50 2-8.py
-rw-r--r--@ 1 kaito staff 240 Feb 18 22:29 2-9.py
-rw-r--r-- 1 kaito staff 1339 Feb 27 00:25 task_master.py
-rw-r--r-- 1 kaito staff 1086 Feb 27 00:31 task_worker.py
-rw-r--r-- 1 kaito staff 446 Feb 27 20:26 test.py
-rw-r--r-- 1 kaito staff 199 Feb 27 20:31 test2.py
Wenn check </ code> auf True gesetzt ist, tritt ein Fehler auf, wenn der Statuscode ungleich Null ist.
subprocess.run("exit 1", shell=True, check=True)
Ausführungsergebnis:
Traceback (most recent call last):
File "test2.py", line 4, in <module>
subprocess.run("exit 1", shell=True, check=True)
File "/Users/kaito/opt/miniconda3/lib/python3.7/subprocess.py", line 487, in run
output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1.
Die \ _ \ _ repr \ _ \ _ </ code> der Klasse
CompletedProcess </ code> sieht folgendermaßen aus.
print(subprocess.run(["ls", "-l", "/dev/null"], stdout=subprocess.PIPE))
Ausführungsergebnis:
CompletedProcess(args=['ls', '-l', '/dev/null'], returncode=0, stdout=b'crw-rw-rw- 1 root wheel 3, 2 Feb 27 20:37 /dev/null\n')
3-2. subprocess.Popen
Für erweiterte Vorgänge können Sie die Klasse subprocess.Popen </ code> verwenden.
class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False,
startup_info=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=())
Die Methode der Klasse subprocess.Popen </ code> lautet wie folgt.
Methode | Erläuterung |
---|---|
poll | Gibt einen Statuscode zurück, wenn der untergeordnete Prozess die Ausführung beendet hat, und gibt None zurück, wenn er nicht abgeschlossen wurde |
wait | Warten Sie, bis der untergeordnete Prozess die Ausführung abgeschlossen hat, und lösen Sie einen TimeoutExpired-Fehler aus, wenn ein Timeout auftritt |
communicate | Kommunizieren Sie mit untergeordneten Prozessen |
send_signal | Senden Sie ein Signal an einen untergeordneten Prozess, z. B. ein Signal.signal(signal.SIGINT)Ist die Befehlszeile des UNIX-basierten Betriebssystems Strg+Signal beim Drücken von C. |
terminate | Beenden Sie den untergeordneten Prozess |
kill | Töte den Kinderprozess |
Hier einige Beispiele für die Verwendung des Unterprozesses . Öffnen Sie </ code>.
Sie können Ihren Python-Code als externes Programm ausführen.
import subprocess
#Verbinden Sie die Rohre mit Standardeingängen, Standardausgängen und Standardfehlern
p = subprocess.Popen(["python"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
#Schreiben Sie Daten in die Standardeingabe
p.stdin.write(b'print("stdin")\n')
#Übergeben Sie Daten als Eingabe für die Kommunikation
out, err = p.communicate(input=b'print("communicate")\n')
print(out.decode())
Ausführungsergebnis:
stdin
communicate
Die Pipeline-Verarbeitung mit | </ code> kann erstellt werden, indem die Standardausgabe und die Standardeingabe von zwei untergeordneten Prozessen mit einer Pipe verbunden werden.
#Leiten Sie die beiden untergeordneten Prozesse zusammen
p1 = subprocess.Popen(['df', '-h'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', 'Data'], stdin=p1.stdout, stdout=subprocess.PIPE)
out, err = p2.communicate() # df -h | grep Data
print(out.decode())
Ausführungsergebnis:
/dev/disk1s1 466Gi 438Gi 8.0Gi 99% 1156881 4881295959 0% /System/Volumes/Data
map auto_home 0Bi 0Bi 0Bi 100% 0 0 100% /System/Volumes/Data/home
Die Entwicklung von Python hat jedoch noch nicht aufgehört. Derzeit wird ein übergeordnetes Modul entwickelt, das gleichzeitiges </ code>
Threading </ code> und
Multiprocessing </ code> weiter verkapselt, um die Verwendung zu vereinfachen.
Der aktuelle gleichzeitig </ code> hat nur ein Modul namens
futures </ code>.
futures </ code> ist [Future pattern](https://ja.wikipedia.org/wiki/Future_%E3%83%91%E3%82%BF%E3%83%BC%E3%83% B3) Python-Implementierung. Hier möchte ich die Funktionen vorstellen, die derzeit verwendet werden können.
concurrent.futures </ code> bietet
ThreadPoolExecutor </ code> und
ProcessPoolExecutor </ code>, die von der Klasse
Executor </ code> erben. Werden.
ThreadPoolExecutor </ code> und
ProcessPoolExecutor </ code> erhalten ein Argument namens
max_works </ code>, das die Anzahl der Threads oder Prozesse angibt. Führt eine einzelne Aufgabe mit der Methode
submit </ code> aus und gibt eine Instanz der Klasse
Future </ code> zurück.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests
def load_url(url):
return requests.get(url)
if __name__ == '__main__':
url = 'https://www.python.org/'
executor = ProcessPoolExecutor(max_workers=4) # ThreadPoolExecutor(max_workers=4)
future = executor.submit(load_url, url)
print(future)
while 1:
if future.done():
print('status code: {}'.format(future.result().status_code))
break
Ausführungsergebnis:
<Future at 0x10ae058d0 state=running>
status code: 200
Eine einfache http-Anfrage. Beachten Sie, dass bei Verwendung von ProcessPoolExecutor </ code> das Modul
\ _ \ _ main__ </ code> erforderlich ist. Führen Sie es daher nicht in einer REPL-Umgebung aus.
The __main__ module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter.
Die Methode submit </ code> kann nur eine Aufgabe ausführen. Wenn Sie also mehrere Aufgaben ausführen möchten, müssen Sie
map </ code>,
as_completed </ code> und
wait <ausführen Verwenden Sie / code>.
Die Methode map </ code> verwendet eine Ausführungsfunktion und -sequenz als Argumente und gibt einen Generator für Ausführungsergebnisse zurück.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests
URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']
def load_url(url):
return requests.get(url)
if __name__ == '__main__':
# with ThreadPoolExecutor(max_workers=4) as executor:
with ProcessPoolExecutor(max_workers=4) as executor:
for url, data in zip(URLS, executor.map(load_url, URLS)):
print('{} - status_code {}'.format(url, data.status_code))
Ausführungsergebnis:
https://google.com - status_code 200
https://www.python.org/ - status_code 200
https://api.github.com/ - status_code 200
Die Methode as_completed </ code> gibt einen Generator für das Objekt
Future </ code> zurück. Und es blockiert, wenn die Aufgabe nicht abgeschlossen ist.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import requests
URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']
def load_url(url):
return url, requests.get(url).status_code
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
tasks = [executor.submit(load_url, url) for url in URLS]
for future in as_completed(tasks):
print(*future.result())
Ausführungsergebnis:
https://google.com 200
https://www.python.org/ 200
https://api.github.com/ 200
Die Methode wait </ code> blockiert den Hauptthread und den Hauptprozess. Sie können drei Bedingungen mit dem Argument
return_when </ code> festlegen.
Bedingungen | Erläuterung |
---|---|
ALL_COMPLETED | Blockierung freigeben, wenn alle Aufgaben abgeschlossen sind |
FIRST_COMPLETED | Lösen Sie die Blockierung, wenn eine Aufgabe abgeschlossen ist |
FIRST_EXCEPTION | Blockierung aufheben, wenn eine Aufgabe einen Fehler verursacht |
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, ALL_COMPLETED
import requests
URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']
def load_url(url):
requests.get(url)
print(url)
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
tasks = [executor.submit(load_url, url) for url in URLS]
wait(tasks, return_when=ALL_COMPLETED)
print('all completed.') #Nach 3 Drucken wird der Hauptprozess zum Drucken freigegeben
Ausführungsergebnis:
https://www.python.org/
https://api.github.com/
https://google.com
all completed.
Parallele Ausführung Threading --- Threading-Parallelverarbeitung Multiprocessing --- Prozessbasierte Parallelverarbeitung Subprozess --- Subprozessverwaltung concurrent.futures - Parallele Taskausführung
Recommended Posts