Vollständiges Verständnis von Python-Threading und Multiprocessing

Threading und Multiprocessing

Die wichtigsten modernen Betriebssysteme sind Mac OS, UNIX, Linux und Windows. Diese Betriebssysteme unterstützen die Funktion "Multitasking".

Was ist Multitasking? Sie könnten beispielsweise denken, dass in einer Situation, in der Sie einen Browser starten, Musik hören und einen Bericht in Word schreiben, mindestens drei Aufgaben gleichzeitig ausgeführt werden. Und neben den Aufgaben auf der Vorderseite laufen heimlich verschiedene Aufgaben im Zusammenhang mit dem Betriebssystem hinter den Kulissen.

Es ist leicht zu verstehen, dass eine Multi-Core-CPU Multitasking verarbeiten kann, eine Single-Core-CPU jedoch auch Multitasking. Das Betriebssystem führt abwechselnd jede Aufgabe aus. Beispielsweise beträgt Aufgabe 1 0,01 Sekunden, Aufgabe 2 0,01 Sekunden, Aufgabe 3 0,01 Sekunden, Aufgabe 1 0,01 Sekunden usw. Die CPU ist schnell und fühlt sich fast gleichzeitig an. Diese alternative Ausführung wird häufig als ["Concurrent Computing"] bezeichnet (https://ja.wikipedia.org/wiki/%E4%B8%A6%E8%A1%8C%E8%A8%88%E7 % AE% 97).

Natürlich werden Single-Core-CPUs nacheinander ausgeführt, so dass im wahrsten Sinne des Wortes ein gleichzeitiger Fortschritt nur für Multi-Core-CPUs möglich ist. Die Verarbeitung mehrerer Aufgaben auf jedem Kern gleichzeitig mit einer Mehrkern-CPU ist "[Parallel Computing](https://ja.wikipedia.org/wiki/%E4%B8%A6%E5%] 88% 97% E8% A8% 88% E7% AE% 97) ". In den meisten Fällen übersteigt die Anzahl der ausgeführten Aufgaben die Anzahl der Kerne bei weitem, sodass die Arbeit zur "Schichtausführung" auch im Mehrkern ausgeführt wird.

Für das Betriebssystem ist eine Aufgabe ein Prozess. Wenn Sie beispielsweise einen Browser starten, wird ein einzelner Browserprozess erstellt. Ebenso wird beim Öffnen von Word ein Word-Prozess erstellt.

Ein Prozess ist nicht unbedingt ein Prozess. Beispielsweise führt Word viele Verarbeitungsschritte durch, z. B. die Überwachung von Benutzereingaben, die Rechtschreibprüfung und die Anzeige der Benutzeroberfläche. Diese "Unteraufgaben" werden als Threads bezeichnet. Es gibt mindestens einen Thread pro Prozess. Wenn mehrere Threads vorhanden sind, wechseln sie sich wie Prozesse ab.

Es gibt zwei Möglichkeiten, Multitasking in Python gleichzeitig zu verarbeiten.

Natürlich können Sie mehrere Threads in mehreren Prozessen haben, dies wird jedoch aufgrund der Komplexität des Modells nicht empfohlen.

Bei der Verarbeitung von Mehrfachaufgaben kann die Kommunikation und Zusammenarbeit zwischen Aufgaben erforderlich sein, Aufgabe 1 muss möglicherweise angehalten werden, wenn Aufgabe 2 ausgeführt wird, oder Aufgabe 3 und Aufgabe 4 können möglicherweise nicht gleichzeitig fortfahren. Das Programm wird etwas kompliziert.

スレッドとプロセスの関係+プロセスはOSから割り当てられた様々なリソースを持っている..jpg (Quelle: Überblick über die Vorlesung zur Systemsoftware)

  1. threading In Unix-basierten Betriebssystemen können die folgenden Systemaufruffunktionen hauptsächlich für die folgenden Threads verwendet werden.
Funktion Erläuterung
start() Starten Sie einen Thread
setName() Gib dem Thread einen Namen
getName() Holen Sie sich den Namen des Threads
setDaemon(True) FadenDaemonZu
join() Warten Sie, bis der Thread die Verarbeitung abgeschlossen hat
run() Führen Sie die Thread-Verarbeitung manuell aus

Python-Threads werden nicht prozesssimuliert, sondern sind echte POSIX-Threads 83% E3% 83% 89). In der Standardbibliothek können Sie zwei Module verwenden: _thread </ code> und threading </ code>. Und _thread </ code> ist ein Modul auf niedriger Ebene, und threading </ code> ist ein Modul, das es kapselt. Daher verwende ich normalerweise Threading </ code>.

1-1. Instanziierung

Sie können einen Thread starten, indem Sie eine Funktion usw. einführen, um eine Instanz von Thread </ code> zu erstellen und mit start </ code> zu starten.

import threading
import time


def run(n):
    # threading.current_thread().Name ist getName()Anruf
    print("task: {} (thread name: {})".format(n, threading.current_thread().name))
    time.sleep(1)
    print('2s')
    time.sleep(1)
    print('1s')
    time.sleep(1)
    print('0s')
    time.sleep(1)


t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",), name='Thread T2') #Hier setName()Wird genannt
# start()
t1.start()
t2.start()
# join()
t1.join()
t2.join()
# join()Weil ich angerufen habe
#Der Haupt-Thread wartet, bis der obige Thread abgeschlossen ist
#Drucken, wenn alles erledigt ist
print(threading.current_thread().name)

Ausführungsergebnis:

task: t1 (thread name: Thread-1)
task: t2 (thread name: Thread T2)
2s
2s
1s
1s
0s
0s
MainThread

Sie können sehen, dass t1 und t2 abwechselnd ausgeführt werden. Eine der Wechselregeln wird in GIL nach der E / A-Operation (wo die Operation print </ code> gilt) ausführlicher erläutert.

1-2. Anpassen

Es ist auch möglich, Thread </ code> zu erben und die run </ code> -Methode der Thread-Klasse anzupassen.

import threading
import time


class MyThread(threading.Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()
        self.n = n

    # run()Umschreiben
    def run(self):
        print("task: {}".format(self.n))
        time.sleep(1)
        print('2s')
        time.sleep(1)
        print('1s')
        time.sleep(1)
        print('0s')
        time.sleep(1)


t1 = MyThread("t1")
t2 = MyThread("t2")

t1.start()
t2.start()

Ausführungsergebnis:

task: t1
task: t2
2s
2s
1s
1s
0s
0s

1-3 Berechnen Sie die Anzahl der Threads

Sie können die Anzahl der aktiven Threads mit active_count </ code> zählen. In einer REPL-Umgebung müssen jedoch mehrere Threads überwacht werden, sodass die Anzahl der Threads höher als erwartet ist.

Führen Sie den folgenden Code in einem Skript aus.

import threading
import time


def run(n):
    print("task: {}".format(n))
    time.sleep(1)


for i in range(3):
    t = threading.Thread(target=run, args=("t{}".format(i),))
    t.start()

time.sleep(0.5)
print(threading.active_count())

Ausführungsergebnis:

task: t0
task: t1
task: t2
4

Wenn der Hauptthread print </ code> ausgeführt wird, ist die Anzahl der Threads = 3 + 1 (Hauptthread), da andere Threads noch ausgeführt werden.

import threading
import time


def run(n):
    print("task: {}".format(n))
    time.sleep(0.5)


for i in range(3):
    t = threading.Thread(target=run, args=("t{}".format(i),))
    t.start()

time.sleep(1)
print(threading.active_count())

Ausführungsergebnis:

task: t0
task: t1
task: t2
1

Durch Anpassen der Ausführungszeit und Verzögern des -Drucks </ code> des Hauptthreads wird die Anzahl der aktiven Threads nur für den Hauptthread zu 1.

1-4 Dämonenfaden

Starten Sie den Thread als Daemon.

import threading
import time


def run(n):
    print("task: {}".format(n))
    time.sleep(1)
    print('3')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('1')


for i in range(3):
    t = threading.Thread(target=run, args=("t{}".format(i),))
    # setDaemon(True)
    t.setDaemon(True) 
    t.start()

time.sleep(1.5)
print('Anzahl der Themen: {}'.format(threading.active_count()))

Ausführungsergebnis:

task: t0
task: t1
task: t2
3
3
3
Anzahl der Themen: 4

Da t1, t2 und t3 als Daemon-Thread des Haupt-Threads festgelegt sind, werden sie gestoppt, wenn der Haupt-Thread endet. Beispielsweise ist die Rechtschreibprüfung von Word ein Daemon-Thread, der in einer Endlosschleife ausgeführt wird. Wenn der Haupt-Thread jedoch ausfällt, wird er ausgefallen. 1-5. GIL Bei Verwendung einer Multi-Core-CPU in anderen Programmiersprachen können Threads mit der Anzahl der Kerne gleichzeitig ausgeführt werden. In Python wird jedoch jeweils nur ein Thread ausgeführt. Dies ist ein Prozess. Mit anderen Worten, Python-Multithreading ist vollständig parallel. Der Grund ist [GIL (Global Interpreter Lock)](https://ja.wikipedia.org/wiki/%E3%82%B0%E3%83%AD%E3%83%BC%E3%83%90%E3 % 83% AB% E3% 82% A4% E3% 83% B3% E3% 82% BF% E3% 83% 97% E3% 83% AA% E3% 82% BF% E3% 83% AD% E3% 83 Es befindet sich bei% 83% E3% 82% AF).

GIL ist eine Art exklusive Kontrolle (wird später erklärt). Als Python zum ersten Mal entwickelt wurde, haben wir GIL implementiert, um die Kombination mit Datensicherheits- und C-Sprachbibliotheken zu vereinfachen. Wenn Sie einen Thread ausführen, benötigen Sie eine GIL. In einem Python-Interpreter gibt es nur einen Python-Prozess. Und da es in einem Python-Prozess nur eine GIL gibt, kann jeweils nur ein Thread ausgeführt werden. GIL ist wie ein Pass, und Threads ohne GIL können nicht in die CPU gelangen. GIL ist übrigens in CPython (normale Python-Distribution), aber nicht in PyPy und Jython. Eine andere bekannte Sprache mit GIL ist Ruby.

1-5-1. Multithreading-Prozedur in CPython

  1. Ressourcen abrufen
  2. Fordern Sie GIL an
  3. Python-Interpreter beschafft systemeigene Threads
  4. Das Betriebssystem betreibt die CPU zur Berechnung
  5. Sobald die GIL-Erfassungsregeln erfüllt sind, wird die GIL erfasst, um festzustellen, ob die Berechnung abgeschlossen ist.
  6. Ein anderer Thread wiederholt die obigen Schritte
  7. Wenn die GIL erneut auftritt, verarbeiten Sie die Fortsetzung der vorherigen, bis die GIL-Wiederherstellungsregel erneut erfüllt ist (Kontextwechsel).

1-5-2. Verschiedene Versionen der GIL-Wiederherstellungsregeln

  • Python 2.X
  • Sammeln, wenn eine E / A-Operation auftritt
  • Sammeln, wenn die Zecken 100 erreichen
  • Ticks sind Zähler für GIL, die die Anzahl der virtuellen Python-Prozesse aufzeichnen
  • Wenn 100 erreicht ist, wird GIL gesammelt und auf 0 zurückgesetzt.
  • Sie können den Schwellenwert mit sys.setcheckinterval </ code> festlegen
  • Python 3.X
  • Zecken wurden verworfen
  • Messen Sie die Zeit mit einem Timer und erfassen Sie, wenn der Schwellenwert überschritten wird

Lassen Sie uns als Experiment eine einfache Endlosschleife ausführen.

import threading
import multiprocessing


def loop():
    x = 0
    while True:
        x = x ^ 1


for i in range(multiprocessing.cpu_count()):
    t = threading.Thread(target=loop)
    t.start()
Screen Shot 2020-02-26 at 19.02.51.png

Wie Sie sehen können, liegt die CPU-Auslastung aufgrund von GIL in einem einzigen Prozess nur bei etwa 100%, unabhängig davon, wie sehr Sie es versuchen (auf einer Quad-Core-CPU sollten bis zu 400% verfügbar sein).

1-5-3. Recheneffizienz von Python-Programmen für verschiedene Arten von Aufgaben

  • CPU-gebundene Aufgabe
  • Nach einer bestimmten Zeit wird die GIL gesammelt und der Thread gewechselt, was die Berechnungskosten erhöht und verlangsamt.
  • IO-gebundene Aufgabe
  • Threads jedes Mal umschalten, wenn eine E / A-Operation ausgeführt wird. Es ist effizient, da es für andere Verarbeitungen verwendet werden kann, ohne auf langsames Lesen und Schreiben von Dateien zu warten.
  • Wenn Sie die Multi-Core-CPU optimal nutzen möchten, wird Multiprocessing empfohlen. Jeder Prozess hat seine eigene GIL.

1-6 Gewindesteuerung

Ressourcen werden von Threads im selben Prozess gemeinsam genutzt. Und weil die Threads zufällig und nicht in der richtigen Reihenfolge gewechselt werden, können die Daten durcheinander geraten.

import threading


#Geld sparen
balance = 0


def change_it(n):
    #Auszahlung und Einzahlung sollten 0 sein
    global balance
    balance = balance + n
    balance = balance - n


def run_thread(n):
    for i in range(100000):
        change_it(n)


t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

Wie Sie sehen können, indem Sie den obigen Code mehrmals ausführen, ist das Ergebnis ungleich Null.

balance = balance + n </ code> kann in zwei atomare Operationen aufgeteilt werden.

x = balance + n
balance = x

Das x </ code> ist hier eine lokale Variable, und jeder Thread hat seinen eigenen x </ code>. Wenn der obige Code der Reihe nach ausgeführt wird, wird er wie folgt.

balance = 0 #Ursprünglicher Wert

t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t1: balance = x1     # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1     # balance = 0

t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2     # balance = 8
t2: x2 = balance - 8 # x2 = 8 - 8 = 0
t2: balance = x2     # balance = 0
    
balance = 0 #Das Ergebnis ist korrekt

Wenn die Reihenfolge jedoch unterschiedlich ist, ist das Ergebnis unterschiedlich.

balance = 0 #Ursprünglicher Wert

t1: x1 = balance + 5  # x1 = 0 + 5 = 5

t2: x2 = balance + 8  # x2 = 0 + 8 = 8
t2: balance = x2      # balance = 8

t1: balance = x1      # balance = 5
t1: x1 = balance - 5  # x1 = 5 - 5 = 0
t1: balance = x1      # balance = 0

t2: x2 = balance - 8  # x2 = 0 - 8 = -8
t2: balance = x2   # balance = -8

balance = -8 #Das Ergebnis ist falsch

Dieses Phänomen der unvorhersehbaren Berechnung führt zu Multithreading und wird als Thread-unsicher bezeichnet.

Um dies zu lösen, müssen Sie den Thread sperren und steuern.

1-6-1. Exklusive Kontrolle (Mutex)

import threading


#Geld sparen
balance = 0


def change_it(n):
    #Holen Sie sich das Schloss
    lock.acquire()
    global balance
    balance = balance + n
    balance = balance - n
    #Lassen Sie das Schloss los
    lock.release()


def run_thread(n):
    for i in range(100000):
        change_it(n)


lock = threading.Lock()  #Instanziieren Sie eine Sperre

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

Durch die Verwendung der exklusiven Steuerung kann kein anderer Thread auf die Ressource zugreifen, bis die Sperre aufgehoben wird. Auf diese Weise ist das Berechnungsergebnis immer 0.

1-6-2. Rekursive exklusive Steuerung

Exklusive Steuerung, mit der verschachtelte Sperren rekursiv freigegeben werden können.

import threading


#Geld sparen
balance = 0


def add_it(n):
    lock.acquire()
    global balance
    balance = balance + n
    return balance


def sub_it(n):
    lock.acquire()
    global balance
    balance = balance - n
    return balance


def change_it(n):
    #Holen Sie sich das Schloss
    lock.acquire()
    global balance
    balance = add_it(n)
    balance = sub_it(n)
    #Sperre rekursiv aufheben
    lock.release()


def run_thread(n):
    for i in range(1000):
        change_it(n)


lock = threading.RLock()  #Instanziieren Sie eine Sperre

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

Hier wird die Sperre auch in add_it </ code> und sub_it </ code> erworben. Durch die Verwendung der rekursiven exklusiven Steuerung ist es nicht erforderlich, jede Sperre aufzuheben, und alle können auf einmal freigegeben werden. Es ist jedoch sehr rechenintensiv, sodass wir die Anzahl der Schleifen reduzieren.

1-6-3. BoundedSemaphore-Steuerung

Mit der exklusiven Kontrolle können Ressourcen zu einem bestimmten Zeitpunkt nur von einem Thread verarbeitet werden, während Semapho eine Grenze darstellt, die die gleichzeitige Verarbeitung einer bestimmten Anzahl von Threads ermöglicht. Zum Beispiel ist eine Situation, in der drei Toilettensitze in der Toilette sind, drei Personen sie gleichzeitig benutzen und andere in der Schlange stehen, ein Semapho.

import threading
import time


def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("current thread: {}\n".format(n))
    semaphore.release()


semaphore = threading.BoundedSemaphore(5)  #Ermöglicht die gleichzeitige Verarbeitung von 5 Threads

for i in range(22):
    t = threading.Thread(target=run, args=("t-{}".format(i),))
    t.start()

while threading.active_count() != 1:
    pass  # print threading.active_count()
else:
    print('-----Alle Threads sind beendet-----')

Wenn Sie den obigen Code ausführen, können Sie sehen, dass die aktuellen Thread-Zeichenfolgen fünf mal fünf ausgegeben werden.

1-6-4. Ereignissteuerung

Thread-Ereignisse dienen dazu, dass der Haupt-Thread andere Threads steuert. Die folgenden Methoden werden für Ereignis </ code> bereitgestellt.

Methode Erläuterung
clear Setzen Sie das Flag auf False
set Setzen Sie das Flag auf True
is_set Gibt True zurück, wenn das Flag True ist
wait Überwachen Sie die Flagge weiter und blockieren Sie sie, wenn die Flagge falsch ist
import threading
import time

event = threading.Event()


def lighter():
    '''
    flag=True:Grünes Licht
    flag=False:Rotlicht
    '''
    count = 0
    event.set()  #Der Anfangswert ist grünes Licht
    while True:
        if 5 < count <= 10:
            event.clear()  #Mach ein rotes Licht
            print("\33[41;1m rotes Licht...\033[0m")
        elif count > 10:
            event.set()  #Machen Sie grünes Licht
            count = 0
        else:
            print("\33[42;1m grünes Licht...\033[0m")

        time.sleep(1)
        count += 1


def car(name):
    while True:
        if event.is_set():  #Überprüfen Sie, ob das grüne Licht leuchtet
            print("[{}]Voraus...".format(name))
            time.sleep(1)
        else:
            print("[{}]Warten Sie wegen des roten Lichts auf das Signal...".format(name))
            event.wait()
            # flag=Blockiere hier bis True
            print("[{}]Bewegen Sie sich aufgrund von grünem Licht vorwärts...".format(name))


light = threading.Thread(target=lighter,)
light.start()

car = threading.Thread(target=car, args=("MINI",))
car.start()

Mit dem obigen Code haben wir auf der Veranstaltung eine einfache Kommunikation zwischen der Ampel und dem Faden des Autos realisiert.

1-6-5 Timer-Steuerung

Sie können auch einen Timer verwenden, um den Thread nach Zeit zu steuern.

from threading import Timer


def hello():
    print("hello, world")


t = Timer(1, hello)
t.start()  #Hallo wird nach 1 Sekunde ausgeführt

1-6-6. Zustandskontrolle

Es gibt auch eine Methode, um den Thread durch Beurteilen des Zustands zu steuern. Die folgenden Methoden werden unter Bedingung </ code> bereitgestellt.

Methode Erläuterung
wait Hängen Sie den Thread, bis er benachrichtigt wird oder wenn die Zeitüberschreitung des Arguments erreicht ist
notify Hung Thread (Standard n=Benachrichtigen 1); kann nur mit dem erworbenen Schloss verwendet werden
notifyAll Benachrichtigen Sie alle hängenden Threads
import threading
import time
from random import randint
from collections import deque


class Producer(threading.Thread):
    def run(self):
        global stocks
        while True:
            if lock_con.acquire():
                products = [randint(0, 100) for _ in range(5)]
                stocks.extend(products)
                print('Produzent{}Ist{}Produziert.'.format(self.name, stocks))
                lock_con.notify()
                lock_con.release()
            time.sleep(3)


class Consumer(threading.Thread):
    def run(self):
        global stocks
        while True:
            lock_con.acquire()
            if len(stocks) == 0:
                #Warten Sie, bis es produziert wird, wenn das Produkt leer ist
                #Hängen Sie den Thread, bis er benachrichtigt wird
                lock_con.wait()
            print('Kunde{}Ist{}Gekauft. Lager: {}'.format(self.name, stocks.popleft(), stocks))
            lock_con.release()
            time.sleep(0.5)


stocks = deque()
lock_con = threading.Condition()
p = Producer()
c = Consumer()
p.start()
c.start()

Ausführungsergebnis:

Produzententhread-1 ist deque([73, 2, 93, 52, 21])Produziert.
Kunden-Thread-2 kaufte 73. Lager: deque([2, 93, 52, 21])
Kunden-Thread-2 gekauft 2. Lager: deque([93, 52, 21])
Kunden-Thread-2 kaufte 93. Lager: deque([52, 21])
Kunden-Thread-2 kaufte 52. Lager: deque([21])
Kunden-Thread-2 kaufte 21. Lager: deque([])
Produzententhread-1 ist deque([6, 42, 85, 56, 76])Produziert.
Kunden-Thread-2 kaufte 6. Lager: deque([42, 85, 56, 76])
Kunden-Thread-2 kaufte 42. Lager: deque([85, 56, 76])
Kunden-Thread-2 kaufte 85. Lager: deque([56, 76])
Kunden-Thread-2 kaufte 56. Lager: deque([76])
Kunden-Thread-2 kaufte 76. Lager: deque([])

Es ist ein einfaches Programm, bei dem der Hersteller 5 Produkte produziert, wenn der Kunde den gesamten Bestand kauft.

1-6-7 Barrierekontrolle

Es ist eine Steuerung, die gemeinsam ausgeführt wird, wenn die angegebene Anzahl von Threads die Barriere passiert. In einem Online-Spiel können Sie beispielsweise eine Barriere implementieren, die eine bestimmte Zeit wartet, bis das Team eine bestimmte Anzahl von Personen erreicht. Die folgenden Methoden werden in Barrier </ code> bereitgestellt.

Methode Erläuterung
wait Threads passieren die Barriere. Nachdem die angegebene Anzahl von Threads passiert ist, werden alle wartenden Threads freigegeben
reset Leeren Sie die Barriere und geben Sie BrokenBarrierError an den wartenden Thread zurück
abort Brach die Barriere in einen gebrochenen Zustand, alle aktuellen Threads wurden beendet und BrokenBarrierError an Threads zurückgegeben, die danach versuchen, die Barriere zu passieren
import threading

num = 4


def start():
    print('{}Seit ich eine Person bin, hat das Spiel begonnen.'.format(num))


lock = threading.Lock()
barrier = threading.Barrier(num, action=start)


class Player(threading.Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def run(self):
        try:
            if not barrier.broken:
                print('{}Teilgenommen.'.format(self.name))
                barrier.wait(2)
        except threading.BrokenBarrierError:
            print('Weil das Spiel nicht gestartet werden kann{}Hat verlassen.'.format(self.name))


players = []
for i in range(10):
    lock = threading.Lock()
    p = Player(name='Player {}'.format(i))
    players.append(p)

for p in players:
    p.start()

Ausführungsergebnis

Spieler 0 hat teilgenommen.
Spieler 1 hat teilgenommen.
Spieler 2 hat teilgenommen.
Spieler 3 hat teilgenommen.
Das Spiel begann, weil es 4 Personen gab.
Spieler 4 hat teilgenommen.
Spieler 5 hat teilgenommen.
Spieler 6 hat teilgenommen.
Spieler 7 hat teilgenommen.
Das Spiel begann, weil es 4 Personen gab.
Spieler 8 hat teilgenommen.
Spieler 9 hat teilgenommen.
Spieler 8 ist gegangen, weil das Spiel nicht gestartet werden kann.
Spieler 9 ist gegangen, weil das Spiel nicht gestartet werden kann.

Threads werden zufällig ausgeführt, sodass sie nicht immer in der oben angegebenen Reihenfolge ausgegeben werden. Hier mussten die Teams von Spieler 8 und Spieler 9 (Barrieren) das Team verlassen (BrokenBarrierError), da sie die angegebene Anzahl nicht rechtzeitig erreichten.

1-7. ThreadLocal Ich habe erklärt, dass Sie die Daten zwischen Threads gemeinsam nutzen müssen, um sie zu berechnen und die genaue Ausgabe zu berechnen. Es gibt jedoch Situationen, in denen jeder Thread seine eigenen lokalen Variablen verarbeiten soll.

import threading


#Erstellen Sie ein ThreadLocal-Objekt im globalen Bereich
local_school = threading.local()

def process_student():
    #Verdiene dir Schüler, die sich auf den aktuellen Thread beziehen
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))

def process_thread(name):
    #Binden Sie den Namen an den Schüler in ThreadLocal
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

Ausführungsergebnis:

Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)

Die local_school </ code> ist hier eine globale Variable. Da es sich jedoch um ein ThreadLocal </ code> -Objekt handelt, kann die Instanzvariable student </ code> festgelegt werden, ohne dass sich die einzelnen Threads gegenseitig beeinflussen. Sie können es bedienen. Sie können local_school </ code> als Wörterbuch betrachten und Lehrer </ code> sowie Schüler </ code> binden. Und jeder Thread kann beliebig arbeiten und beeinflusst sich nicht gegenseitig. Bei Verwendung von ThreadLocal </ code> können Sie für jeden Thread eine eigene DB-Verbindung, http-Anforderung usw. erstellen. Aus der Sicht eines Threads sind alle empfangenen Daten wie eine lokale Variable und können von jedem anderen Thread bearbeitet werden.

  1. multiprocessing Unter Unix-basierten Betriebssystemen können Sie einen Prozess mit dem Systemaufruf fork () </ code> erstellen. Durch Aufrufen von fork () </ code> wird der aktuelle Prozess kopiert. Der kopierte Prozess wird als untergeordneter Prozess bezeichnet, und der ursprüngliche Prozess wird zum übergeordneten Prozess. Der Rückgabewert von fork () </ code> wird sowohl an den untergeordneten als auch an den übergeordneten Prozess zurückgegeben. Der Rückgabewert des untergeordneten Prozesses ist 0, und die ID des untergeordneten Prozesses wird im übergeordneten Prozess zurückgegeben. Der Grund ist, dass der übergeordnete Prozess die ID des untergeordneten Prozesses aufzeichnen muss. Sie können die ID des übergeordneten Prozesses vom untergeordneten Prozess mit getppid </ code> abrufen.

Das Python OS </ code> -Modul kapselt das Systemaufrufsystem.

import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

Ausführungsergebnis:

Process (19148) start...
I (19148) just created a child process (19149).
I am child process (19149) and my parent is 19148.

Hier geben der übergeordnete Prozess und der untergeordnete Prozess unterschiedliche bedingte Zweige ein. Bitte beachten Sie, dass Windows keinen Systemaufruf fork () </ code> hat und nicht ausgeführt werden kann.

Mit fork () </ code> kann ein neuer Prozess erstellt und verarbeitet werden, wenn ein Prozess eine neue Aufgabe übernimmt. Mit dem bekannten Apache-Server kann der übergeordnete Prozess beispielsweise den Port und fork () </ code> überwachen, damit der untergeordnete Prozess neue http-Anforderungen verarbeiten kann.

Beim Schreiben von Python-Multiprozessprogrammen wird empfohlen, das Modul multiprocessing </ code> der Standardbibliothek zu verwenden. Das Multiprocessing </ code> -Modul ist ein Modul, das parallel verarbeitet werden kann. Es wird auch gesagt, dass das Modul Multiprocessing </ code> implementiert wurde, da das Modul threading </ code> aufgrund von GIL nicht parallel verarbeitet werden kann.

Das Multiprocessing </ code> -Modul ist auch plattformübergreifend, sodass Sie Multiprozessprogramme unter Windows erstellen können. Wie oben erwähnt, verfügt Windows nicht über fork () </ code>. Wenn Sie also einen Prozess mit dem Multiprocessing </ code> -Modul erstellen, wird eine Pseudo fork () </ code> erstellt Ich verarbeite. Die Möglichkeit hierfür besteht darin, alle Python-Objekte im übergeordneten Prozess mit Pickle </ code> zu serialisieren und an den untergeordneten Prozess zu übergeben. Wenn der Aufruf des Multiprocessing </ code> -Moduls unter Windows fehlschlägt, ist die Pickle </ code> möglicherweise fehlgeschlagen.

Wenn Sie einen untergeordneten Prozess erstellen und einen externen Befehl ausführen möchten, können Sie den Unterprozess der Standardbibliothek </ code> verwenden. Hier wird jedoch zunächst die Python-Verarbeitung im Multiprozessmodul multiprocessing </ code> ausgeführt. Ich werde die Funktion vorstellen.

2-1. Prozess

Sie können untergeordnete Prozesse einfach mithilfe von Prozessen erstellen.

from multiprocessing import Process
import os


#Was der untergeordnete Prozess tut
def run_proc(name):
    print('Run child process {} ({})...'.format(name, os.getpid()))


print('Parent process {}.'.format(os.getpid()))
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')

Ausführungsergebnis:

Parent process 19218.
Child process will start.
Run child process test (19219)...
Child process end.

Übergeben Sie die Ausführungsfunktion und die Argumente an Process </ code>, erstellen Sie eine Instanz und starten Sie sie mit start </ code>. Sie können ganz einfach einen untergeordneten Prozess aus fork () </ code> erstellen. Wenn Sie hier join </ code> verwenden, wartet der übergeordnete Prozess, bis die Ausführung des untergeordneten Prozesses abgeschlossen ist, genau wie wenn es sich um einen Thread handelt.

2-2. Prozesspool

Das Erstellen eines untergeordneten Prozesses ist sehr rechenintensiv. Wenn Sie also eine große Anzahl von Prozessen erstellen möchten, ist es effizienter, einen Prozesspool mit Pool </ code> zu erstellen. Die Hauptmethoden von Pool </ code> sind wie folgt.

Methode Erläuterung
apply Synchrone Verarbeitung
apply_async Asynchrone Verarbeitung
terminate Beenden Sie sofort
join Der übergeordnete Prozess wartet darauf, dass der untergeordnete Prozess die Verarbeitung beendet. Die Prozessverknüpfung kann erst nach dem Schließen oder Beenden erfolgen
close Beenden Sie, wenn alle Prozesse abgeschlossen sind
from multiprocessing import Pool
import os
import time
import random


def long_time_task(name):
    print('Run task {} ({})...'.format(name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task {} runs {} seconds.'.format(name, (end - start)))


print('Parent process {}.'.format(os.getpid()))
p = Pool(4)  #Bis zu 4 untergeordnete Prozesse gleichzeitig
for i in range(5):
    p.apply_async(long_time_task, args=(i,))
#Aufgrund der asynchronen Verarbeitung muss der übergeordnete Prozess nicht auf die Verarbeitung des untergeordneten Prozesses warten.
#Mach den nächsten Druck
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')

Ausführungsergebnis:

Parent process 19348.
Waiting for all subprocesses done...
Run task 0 (19349)...
Run task 1 (19350)...
Run task 2 (19351)...
Run task 3 (19352)...
Task 1 runs 0.8950300216674805 seconds.
Run task 4 (19350)...
Task 2 runs 1.0132842063903809 seconds.
Task 4 runs 0.3936619758605957 seconds.
Task 3 runs 2.3689510822296143 seconds.
Task 0 runs 2.776203155517578 seconds.
All subprocesses done.

Da die Poolgröße 4 beträgt, wird Task 4 ausgeführt, nachdem Task 0 bis Task 3 abgeschlossen sind.

2-3 Kommunikation zwischen Prozessen

Im Gegensatz zu Threads werden Daten nicht zwischen Prozessen geteilt. Das Betriebssystem bietet viele Methoden der prozessübergreifenden Kommunikation. Multiprocessing </ code> kapselt Funktionen des Betriebssystems auf niedriger Ebene, um die Verwendung zu vereinfachen.

2-3-1. Warteschlange

FIFO-Datenstrukturwarteschlangen werden häufig für die Kommunikation zwischen Prozessen verwendet.

from multiprocessing import Process, Queue
import os
import time
import random


#Daten in die Warteschlange schreiben
def write(q):
    print('Process to write: {}'.format(os.getpid()))
    for value in ['A', 'B', 'C']:
        print('Put {} to queue...'.format(value))
        q.put(value)
        time.sleep(random.random())


#Daten aus der Warteschlange lesen
def read(q):
    print('Process to read: {}'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('Get {} from queue.'.format(value))


#Der übergeordnete Prozess erstellt eine Warteschlange und übergibt sie an den untergeordneten Prozess
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
#Starten Sie pw und beginnen Sie zu schreiben
pw.start()
#Starten Sie pr und beginnen Sie zu lesen
pr.start()
#Warten Sie, bis pw fertig ist
pw.join()
#pr ist eine Endlosschleife, also töte
pr.terminate()

Ausführungsergebnis:

Process to write: 19489
Put A to queue...
Process to read: 19490
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

Selbst wenn der Messwert langsam ist, kann er aufgrund des FIFO in der richtigen Reihenfolge abgerufen werden.

2-3-2. Rohr

Wie der Name schon sagt, können Pipes als rohrförmige Datenstrukturen betrachtet werden. Daten werden übertragen, indem Daten auf einer Seite der Pipe abgelegt werden ( send </ code> -Methode) und Daten auf der anderen Seite empfangen werden ( recv </ code> -Methode). Bitte beachten Sie, dass Daten beschädigt werden können, wenn zwei Prozesse gleichzeitig Daten desselben Typs speichern oder empfangen.

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()


parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()

Ausführungsergebnis:

[42, None, 'hello']

2-3-3. Gemeinsamer Speicher

Ich erklärte, dass die Daten zwischen Prozessen nicht geteilt werden, aber es ist tatsächlich eine Lüge ... In Abhängigkeit vom Betriebssystem [gemeinsam genutzter Speicher] zwischen Prozessen (https://ja.wikipedia.org/wiki/%E5%85%B1%E6%9C%89%E3%83%A1%E3%83%A2% E3% 83% AA #% E3% 82% BD% E3% 83% 95% E3% 83% 88% E3% 82% A6% E3% 82% A7% E3% 82% A2% E3% 81% AB% E3 % 82% 88% E3% 82% 8B% E5% 85% B1% E6% 9C% 89% E3% 83% A1% E3% 83% A2% E3% 83% AA) können hergestellt werden. In Python können Sie mit Value </ code> und Array </ code> numerische Daten und Array-Daten im gemeinsam genutzten Speicher speichern. Abgesehen davon verwenden Value </ code> und Array </ code> die Datenstruktur der C-Sprache so wie sie ist. Pythons Zahlen (die die Zahlenklasse erben) ist unveränderlich und kann nicht direkt umgeschrieben werden. ..

from multiprocessing import Process, Value, Array


def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]


num = Value('d', 0.0)  #doppelte Typennummer
arr = Array('i', range(10))  #Array

p = Process(target=f, args=(num, arr))
p.start()
p.join()

print(num.value)
print(arr[:])

Ausführungsergebnis:

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
    • Das Modul Multiprocessing.shared_memory wurde aus Python 3.8 hinzugefügt. Ich werde aktualisieren, wenn Miniconda aktualisiert wird. * *

2-3-4. Manager

Es kann genauer sein zu sagen, dass der Manager die Daten teilt, anstatt sie zu übermitteln. Manager () </ code> gibt ein Managerobjekt zurück und erstellt einen Serverprozess. Über den Serverprozess können andere Prozesse auf Proxy-Weise mit Python-Objekten arbeiten. Manager-Objekte unterstützen Python-Objekte list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array </ code>.

from multiprocessing import Process, Manager


def f(d, l, i):
    d[i] = i
    d[str(i)] = str(i)
    l.append(i)
    print(l)


with Manager() as manager:
    shared_dict = manager.dict()
    shared_list = manager.list()
    p_list = []
    #Erstellen Sie 10 Prozesse
    for i in range(10):
        p = Process(target=f, args=(shared_dict, shared_list, i))
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()

    print('All subprocesses done.')
    print(shared_dict)
    print(shared_list)

Ausführungsergebnis:

[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4, 5]
[0, 1, 2, 3, 4, 5, 6]
[0, 1, 2, 3, 4, 5, 6, 8]
[0, 1, 2, 3, 4, 5, 6, 8, 7]
[0, 1, 2, 3, 4, 5, 6, 8, 7, 9]
All subprocesses done.
{0: 0, '0': '0', 1: 1, '1': '1', 2: 2, '2': '2', 3: 3, '3': '3', 4: 4, '4': '4', 5: 5, '5': '5', 6: 6, '6': '6', 8: 8, '8': '8', 7: 7, '7': '7', 9: 9, '9': '9'}
[0, 1, 2, 3, 4, 5, 6, 8, 7, 9]

Ich habe versucht, eine Liste und ein Wörterbuch für die Freigabe zwischen Prozessen im Manager zu erstellen. Hier sehen Sie, dass die Prozesse nicht aufeinander folgen.

2-3-5 Verarbeitung der Prozesssperre

Prozesse haben wie Threads Sperren.

from multiprocessing import Process, Lock


def f(i):
    lock.acquire()
    try:
        print('hello world', i)
    finally:
        lock.release()


lock = Lock()

for num in range(10):
    Process(target=f, args=(num,)).start()

Ausführungsergebnis:

hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9

Aufgrund der Sperre werden die Zahlen im Gegensatz zur vorherigen Zeit in der angegebenen Reihenfolge ausgegeben. Sie können jedoch die Leistung von Multiprozessen nicht demonstrieren.

2-4 Verteilte Prozessverarbeitung

Python-Prozesse können auf mehreren Computern verarbeitet werden. Das Submodul manager </ code> des Moduls multiprocessing </ code> kann Prozesse auf mehrere Maschinen verteilen. Auch wenn Sie das Kommunikationsprotokoll nicht kennen, können Sie ein Programm für die verteilte Prozessverarbeitung schreiben.

Die verteilte Prozessverarbeitung erfordert einen Serverprozess, der Aufgaben verteilt, und einen Arbeitsprozess, der die Aufgaben tatsächlich verarbeitet. Implementieren Sie zunächst den Serverprozess task_master.py </ code>.

Hier veröffentlicht manager </ code> die Warteschlange als ** api ** im Internet. Sobald der Serverprozess die Warteschlange gestartet und die Aufgabe eingegeben hat, kann von anderen Computern aus auf sie zugegriffen werden.

task_master.py


import random
import queue  #Da es über das Netz erfolgt, ist die Standardbibliothekswarteschlange ausreichend
from multiprocessing.managers import BaseManager


#Warteschlange zum Senden von Aufgaben
task_queue = queue.Queue()
#Warteschlange, um Ergebnisse zu erhalten
result_queue = queue.Queue()


class QueueManager(BaseManager):
    pass


#Registrieren Sie zwei Warteschlangen als API
#Im Falle von Windows kann Lambda für die API-Registrierung verwendet werden. Bitte definieren Sie die Funktion gehorsam
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)

#Verwenden Sie Port 5000 für die Authentifizierungsverschlüsselung'abc'Zu
#Für Windows muss die Adresse angegeben werden (127).0.0.1)
manager = QueueManager(address=('', 5000), authkey=b'abc')
#anfangen
manager.start()
#Warteschlangenobjekt über Netz abrufen
task = manager.get_task_queue()
result = manager.get_result_queue()

#Versuchen Sie, eine Aufgabe zu erledigen
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task {}...'.format(n))
    task.put(n)

#Empfangen Sie das Ergebnis aus der Ergebniswarteschlange
print('Try get results...')
for i in range(10):
    #Wenn es 10 Sekunden überschreitet, endet es mit einer Zeitüberschreitung
    r = result.get(timeout=10)
    print('Result: {}'.format(r))

#Ende
manager.shutdown()
print('master exit.')

Implementieren Sie als Nächstes die Datei task_worker.py </ code> für den Arbeitsprozess. Holen Sie sich die Aufgabe mit ** api ** namens manager.get_task_queue </ code>, die oben veröffentlicht wurde, und verarbeiten Sie sie.

task_worker.py


import time
import queue
from multiprocessing.managers import BaseManager


#Erstellen Sie denselben Warteschlangenmanager
class QueueManager(BaseManager):
    pass


#Holen Sie sich API aus dem Netz und registrieren Sie es in Queue Manager
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

#Stellen Sie eine Verbindung zum Server her
server_addr = '127.0.0.1'
print('Connect to server {}...'.format(server_addr))
#Stellen Sie den gleichen Port und die gleiche Authentifizierungsverschlüsselung ein
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
#Verbindung
m.connect()

#Holen Sie sich jede Warteschlange
task = m.get_task_queue()
result = m.get_result_queue()

#Empfangen Sie eine Aufgabe aus der Aufgabenwarteschlange
#Speichern Sie das Verarbeitungsergebnis in der Ergebniswarteschlange
for i in range(10):
    try:
        n = task.get(timeout=1)
        #Hier ist die Aufgabe die einfache Quadratberechnung.
        print('run task {} * {}...'.format(n, n))
        r = '{} * {} = {}'.format(n, n, n*n)
        time.sleep(1)
        result.put(r)
    except queue.Empty:
        print('task queue is empty.')

#Ende
print('worker exit.')

Es kann auch auf einem lokalen Computer ausgeführt werden.

Ausführungsergebnis: Zuerst legt der Serverprozess die Aufgabe zuerst in der task_queue </ code> ab. Wenn Sie alles eingegeben haben, warten Sie auf die Ergebnisse in der result_queue </ code>.

task_master.py


Put task 7710...
Put task 6743...
Put task 8458...
Put task 2439...
Put task 1351...
Put task 9885...
Put task 5532...
Put task 4181...
Put task 6093...
Put task 3815...
Try get results...

Der Arbeitsprozess stellt dann eine Verbindung zum Server her, ruft die Aufgabe in der task_queue </ code> ab und verarbeitet sie. Das Verarbeitungsergebnis wird an result_queue </ code> gesendet.

task_worker.py


Connect to server 127.0.0.1...
run task 7710 * 7710...
run task 6743 * 6743...
run task 8458 * 8458...
run task 2439 * 2439...
run task 1351 * 1351...
run task 9885 * 9885...
run task 5532 * 5532...
run task 4181 * 4181...
run task 6093 * 6093...
run task 3815 * 3815...
worker exit.

Wenn das Ergebnis in result_queue </ code> eingeht, gibt der Serverprozess es der Reihe nach aus.

task_master.py


Put task 7710...
Put task 6743...
Put task 8458...
Put task 2439...
Put task 1351...
Put task 9885...
Put task 5532...
Put task 4181...
Put task 6093...
Put task 3815...
Try get results...
Result: 7710 * 7710 = 59444100
Result: 6743 * 6743 = 45468049
Result: 8458 * 8458 = 71537764
Result: 2439 * 2439 = 5948721
Result: 1351 * 1351 = 1825201
Result: 9885 * 9885 = 97713225
Result: 5532 * 5532 = 30603024
Result: 4181 * 4181 = 17480761
Result: 6093 * 6093 = 37124649
Result: 3815 * 3815 = 14554225
master exit.

Alle Warteschlangen befinden sich im Serverprozess, da der Arbeitsprozess die Warteschlange nicht erstellt.

Screen Shot 2020-02-27 at 0.58.21.png (Quelle: [Hiroyukimin-ähnliches Regierungsnetzwerk](https://www.liaoxuefeng.com/wiki/1016959663602400/1017631559645600))

Auf diese Weise können verteilte Prozesse in Python realisiert werden. Leistungsstarke Rechenleistung kann durch Verarbeitung mit mehreren Mitarbeitern erzielt werden.

  1. subprocess Unter Unix-basierten Betriebssystemen wurde fork () </ code> erläutert, um eine Kopie des aktuellen Prozesses als untergeordneter Prozess zu erstellen. Das Aufrufen von os.fork </ code> in Python erstellt einen untergeordneten Prozess Ihres Python-Programms. Es gibt jedoch Situationen, in denen Sie einen untergeordneten Prozess benötigen, der externe Befehle anstelle eines Python-Programms ausführen kann. Unter Unix-basierten Betriebssystemen gibt es einen weiteren Systemaufruf exec () </ code>. Es ist in Python als os.execve </ code> implementiert. exec () </ code> ist eine Funktion, die derzeit den Prozess durch ein anderes Programm ersetzt. Das heißt, os.fork </ code> erstellt einen untergeordneten Prozess eines Python-Programms, und os.execve </ code> verwendet andere Programme ( ls </ code>, ls </ code>, die in der Shell ausgeführt werden können. Es kann durch ein Programm wie code> ping </ code> ersetzt werden. Der Unterprozess der Standardbibliothek </ code> ist ein Modul zum Erstellen untergeordneter Prozesse, die externe Programme ausführen. Wenn Sie dann ein externes Programm mit dem Unterprozess </ code> ausführen, erstellen Sie eine Pipe (Pipe) für die prozessübergreifende Kommunikation zwischen dem Python-Prozess und dem untergeordneten Prozess, übergeben Sie Parameter und senden Sie Rückgabewerte und Fehler. Sie können es empfangen.

3-1. subprocess.run Ab Python 3.5 wird offiziell empfohlen, den Befehl in subprocess.run </ code> auszuführen. Hier wird die Erklärung wie subprocess.call </ code> der alten ** api ** weggelassen.

subprocess.run(args, *, stdin=None, input=None, 
    stdout=None, stderr=None, shell=False, timeout=None, check=False, universal_newlines=False)

subprocess.run </ code> gibt eine Instanz der Klasse CompletedProcess </ code> zurück. Die Attribute der Klasse CompletedProcess </ code> lauten wie folgt.

Attribut Erläuterung
args An untergeordnete Prozesse übergebene Parameter, Zeichenfolge oder Liste
returncode Speichert den Statuscode nach der Ausführung
stdout Standardausgabe nach Ausführung
stderr Standardfehler nach der Ausführung
check_returncode() Löst CalledProcessError aus, wenn der Statuscode ungleich Null ist (Ausführungsfehler)

Hier einige Beispiele für die Verwendung von subprocess.run </ code>.

Sie können die Standardausgabe mit dem Unterprozess abfangen.PIPE </ code> (andernfalls wird die Ausgabe verworfen).

import subprocess


# subprocess.run(["ls", "-l"] stdout=subprocess.PIPE)Gleich wie
obj = subprocess.run(["ls", "-l"], stdout=subprocess.PIPE)
print('stdout:\n{}'.format(obj.stdout.decode()))

Ausführungsergebnis:

stdout:
total 128
-rw-r--r--@ 1 kaito  staff   692 Feb 16 19:35 1-1.py
-rw-r--r--@ 1 kaito  staff   509 Feb 17 23:39 1-2.py
-rw-r--r--@ 1 kaito  staff   364 Feb 19 16:48 2-10.py
-rw-r--r--@ 1 kaito  staff   645 Feb 19 19:12 2-17.py
-rw-r--r--@ 1 kaito  staff   213 Feb 19 19:14 2-18.py
-rw-r--r--@ 1 kaito  staff   209 Feb 19 19:18 2-19.py
-rw-r--r--@ 1 kaito  staff   318 Feb 19 23:53 2-20.py
-rw-r--r--@ 1 kaito  staff   194 Feb 19 23:57 2-21.py
-rw-r--r--@ 1 kaito  staff   230 Feb 20 15:46 2-23.py
-rw-r--r--@ 1 kaito  staff   131 Feb 18 19:39 2-4.py
-rw-r--r--@ 1 kaito  staff   543 Feb 18 19:50 2-8.py
-rw-r--r--@ 1 kaito  staff   240 Feb 18 22:29 2-9.py
-rw-r--r--  1 kaito  staff  1339 Feb 27 00:25 task_master.py
-rw-r--r--  1 kaito  staff  1086 Feb 27 00:31 task_worker.py
-rw-r--r--  1 kaito  staff   446 Feb 27 20:26 test.py
-rw-r--r--  1 kaito  staff   199 Feb 27 20:31 test2.py

Wenn check </ code> auf True gesetzt ist, tritt ein Fehler auf, wenn der Statuscode ungleich Null ist.

subprocess.run("exit 1", shell=True, check=True)

Ausführungsergebnis:

Traceback (most recent call last):
  File "test2.py", line 4, in <module>
    subprocess.run("exit 1", shell=True, check=True)
  File "/Users/kaito/opt/miniconda3/lib/python3.7/subprocess.py", line 487, in run
    output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1.

Die \ _ \ _ repr \ _ \ _ </ code> der Klasse CompletedProcess </ code> sieht folgendermaßen aus.

print(subprocess.run(["ls", "-l", "/dev/null"], stdout=subprocess.PIPE))

Ausführungsergebnis:

CompletedProcess(args=['ls', '-l', '/dev/null'], returncode=0, stdout=b'crw-rw-rw-  1 root  wheel    3,   2 Feb 27 20:37 /dev/null\n')

3-2. subprocess.Popen Für erweiterte Vorgänge können Sie die Klasse subprocess.Popen </ code> verwenden.

class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, 
    preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False,
    startup_info=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=())

Die Methode der Klasse subprocess.Popen </ code> lautet wie folgt.

Methode Erläuterung
poll Gibt einen Statuscode zurück, wenn der untergeordnete Prozess die Ausführung beendet hat, und gibt None zurück, wenn er nicht abgeschlossen wurde
wait Warten Sie, bis der untergeordnete Prozess die Ausführung abgeschlossen hat, und lösen Sie einen TimeoutExpired-Fehler aus, wenn ein Timeout auftritt
communicate Kommunizieren Sie mit untergeordneten Prozessen
send_signal Senden Sie ein Signal an einen untergeordneten Prozess, z. B. ein Signal.signal(signal.SIGINT)Ist die Befehlszeile des UNIX-basierten Betriebssystems Strg+Signal beim Drücken von C.
terminate Beenden Sie den untergeordneten Prozess
kill Töte den Kinderprozess

Hier einige Beispiele für die Verwendung des Unterprozesses . Öffnen Sie </ code>.

Sie können Ihren Python-Code als externes Programm ausführen.

import subprocess


#Verbinden Sie die Rohre mit Standardeingängen, Standardausgängen und Standardfehlern
p = subprocess.Popen(["python"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
#Schreiben Sie Daten in die Standardeingabe
p.stdin.write(b'print("stdin")\n')
#Übergeben Sie Daten als Eingabe für die Kommunikation
out, err = p.communicate(input=b'print("communicate")\n')
print(out.decode())

Ausführungsergebnis:

stdin
communicate

Die Pipeline-Verarbeitung mit | </ code> kann erstellt werden, indem die Standardausgabe und die Standardeingabe von zwei untergeordneten Prozessen mit einer Pipe verbunden werden.

#Leiten Sie die beiden untergeordneten Prozesse zusammen
p1 = subprocess.Popen(['df', '-h'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', 'Data'], stdin=p1.stdout, stdout=subprocess.PIPE)
out, err = p2.communicate()  # df -h | grep Data
print(out.decode())

Ausführungsergebnis:

/dev/disk1s1   466Gi  438Gi  8.0Gi    99% 1156881 4881295959    0%   /System/Volumes/Data
map auto_home    0Bi    0Bi    0Bi   100%       0          0  100%   /System/Volumes/Data/home

  1. concurrent.futures Wir haben Ihnen Pythons Multithreading und Multiprocessing vorgestellt. Sie haben vielleicht ein Bild, das etwas kompliziert und schwer zu verstehen ist, aber das ist wahr (lacht). [Go](https://ja.wikipedia.org/wiki/Go_(%E3%83%97%E3%83%AD%E3%82%B0%E3%83%A9%E3%83%9F%E3 Sprachen mit einer Designphilosophie der einfachen parallelen / parallelen Verarbeitung von Anfang an, wie% 83% B3% E3% 82% B0% E8% A8% 80% E8% AA% 9E)), geben die Entwicklungsrichtung der Programmiersprachen an. Es kann sein.

Die Entwicklung von Python hat jedoch noch nicht aufgehört. Derzeit wird ein übergeordnetes Modul entwickelt, das gleichzeitiges </ code> Threading </ code> und Multiprocessing </ code> weiter verkapselt, um die Verwendung zu vereinfachen.

Der aktuelle gleichzeitig </ code> hat nur ein Modul namens futures </ code>. futures </ code> ist [Future pattern](https://ja.wikipedia.org/wiki/Future_%E3%83%91%E3%82%BF%E3%83%BC%E3%83% B3) Python-Implementierung. Hier möchte ich die Funktionen vorstellen, die derzeit verwendet werden können.

4-1. Vollstrecker und Zukunft

concurrent.futures </ code> bietet ThreadPoolExecutor </ code> und ProcessPoolExecutor </ code>, die von der Klasse Executor </ code> erben. Werden. ThreadPoolExecutor </ code> und ProcessPoolExecutor </ code> erhalten ein Argument namens max_works </ code>, das die Anzahl der Threads oder Prozesse angibt. Führt eine einzelne Aufgabe mit der Methode submit </ code> aus und gibt eine Instanz der Klasse Future </ code> zurück.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests


def load_url(url):
    return requests.get(url)


if __name__ == '__main__':
    url = 'https://www.python.org/'
    executor = ProcessPoolExecutor(max_workers=4)  # ThreadPoolExecutor(max_workers=4)
    future = executor.submit(load_url, url)
    print(future)
    while 1:
        if future.done():
            print('status code: {}'.format(future.result().status_code))
            break

Ausführungsergebnis:

<Future at 0x10ae058d0 state=running>
status code: 200

Eine einfache http-Anfrage. Beachten Sie, dass bei Verwendung von ProcessPoolExecutor </ code> das Modul \ _ \ _ main__ </ code> erforderlich ist. Führen Sie es daher nicht in einer REPL-Umgebung aus.

The __main__ module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter.

4-2. Map, as_completed und warte

Die Methode submit </ code> kann nur eine Aufgabe ausführen. Wenn Sie also mehrere Aufgaben ausführen möchten, müssen Sie map </ code>, as_completed </ code> und wait <ausführen Verwenden Sie / code>.

Die Methode map </ code> verwendet eine Ausführungsfunktion und -sequenz als Argumente und gibt einen Generator für Ausführungsergebnisse zurück.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests


URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']


def load_url(url):
    return requests.get(url)


if __name__ == '__main__':
    # with ThreadPoolExecutor(max_workers=4) as executor:
    with ProcessPoolExecutor(max_workers=4) as executor:
        for url, data in zip(URLS, executor.map(load_url, URLS)):
            print('{} - status_code {}'.format(url, data.status_code))

Ausführungsergebnis:

https://google.com - status_code 200
https://www.python.org/ - status_code 200
https://api.github.com/ - status_code 200

Die Methode as_completed </ code> gibt einen Generator für das Objekt Future </ code> zurück. Und es blockiert, wenn die Aufgabe nicht abgeschlossen ist.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import requests


URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']


def load_url(url):
    return url, requests.get(url).status_code


if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as executor:
        tasks = [executor.submit(load_url, url) for url in URLS]
        for future in as_completed(tasks):
            print(*future.result())

Ausführungsergebnis:

https://google.com 200
https://www.python.org/ 200
https://api.github.com/ 200

Die Methode wait </ code> blockiert den Hauptthread und den Hauptprozess. Sie können drei Bedingungen mit dem Argument return_when </ code> festlegen.

Bedingungen Erläuterung
ALL_COMPLETED Blockierung freigeben, wenn alle Aufgaben abgeschlossen sind
FIRST_COMPLETED Lösen Sie die Blockierung, wenn eine Aufgabe abgeschlossen ist
FIRST_EXCEPTION Blockierung aufheben, wenn eine Aufgabe einen Fehler verursacht
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, ALL_COMPLETED
import requests


URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']


def load_url(url):
    requests.get(url)
    print(url)


if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as executor:
        tasks = [executor.submit(load_url, url) for url in URLS]
        wait(tasks, return_when=ALL_COMPLETED)
        print('all completed.')  #Nach 3 Drucken wird der Hauptprozess zum Drucken freigegeben

Ausführungsergebnis:

https://www.python.org/
https://api.github.com/
https://google.com
all completed.

Referenz

Parallele Ausführung Threading --- Threading-Parallelverarbeitung Multiprocessing --- Prozessbasierte Parallelverarbeitung Subprozess --- Subprozessverwaltung concurrent.futures - Parallele Taskausführung

Recommended Posts