Compréhension complète du threading Python et du multitraitement

threading et multitraitement

Les principaux systèmes d'exploitation modernes sont Mac OS, UNIX, Linux et Windows. Ces OS prennent en charge la fonction "multitâche".

Qu'est-ce que le multitâche? Vous pourriez penser, par exemple, dans une situation où vous lancez un navigateur, écoutez de la musique et rédigez un rapport dans Word, au moins trois tâches sont en cours en même temps. Et en plus des tâches à l'avant, diverses tâches liées au système d'exploitation s'exécutent secrètement dans les coulisses.

Il est facile de comprendre qu'un processeur multicœur peut gérer le multitâche, mais un processeur monocœur peut également gérer le multitâche. Le système d'exploitation exécute chaque tâche à tour de rôle. Par exemple, la tâche 1 est de 0,01 seconde, la tâche 2 de 0,01 seconde, la tâche 3 de 0,01 seconde, la tâche 1 de 0,01 seconde, et ainsi de suite. Le processeur est rapide, on a donc l'impression qu'il est presque simultané. Cette exécution alternative est souvent appelée ["calcul simultané"](https://ja.wikipedia.org/wiki/%E4%B8%A6%E8%A1%8C%E8%A8%88%E7 % AE% 97).

Bien entendu, les processeurs monocœur sont exécutés à leur tour, donc dans le vrai sens du terme, la progression simultanée n'est possible que pour les processeurs multicœurs. Le traitement simultané de plusieurs tâches sur chaque cœur au moment d'un processeur multicœur est "[calcul parallèle](https://ja.wikipedia.org/wiki/%E4%B8%A6%E5%] 88% 97% E8% A8% 88% E7% AE% 97) ". Dans la plupart des cas, le nombre de tâches exécutées dépasse de loin le nombre de cœurs, de sorte que le travail d '«exécution par équipes» est également effectué en multicœur.

Pour le système d'exploitation, une tâche est un processus. Par exemple, le lancement d'un navigateur crée un processus de navigateur unique. De même, lorsque vous ouvrez Word, un processus Word est créé.

Un processus n'est pas nécessairement un processus. Par exemple, Word effectue de nombreux traitements, tels que la surveillance des entrées utilisateur, la vérification orthographique et l'affichage de l'interface utilisateur. Ces «sous-tâches» sont appelées Threads. Il y a au moins un thread par processus. Lorsqu'il y a plusieurs threads, ils se relaient comme des processus.

Il existe deux façons principales de gérer le multitâche en Python en même temps.

Bien sûr, vous pouvez avoir plusieurs threads dans plusieurs processus, mais cela n'est pas recommandé en raison de la complexité du modèle.

Lors du traitement du multitâche, la communication et la coopération entre les tâches peuvent être nécessaires, la tâche 1 peut devoir être interrompue lorsque la tâche 2 est exécutée, et la tâche 3 et la tâche 4 peuvent ne pas pouvoir se poursuivre en même temps. , Le programme devient un peu compliqué.

スレッドとプロセスの関係+プロセスはOSから割り当てられた様々なリソースを持っている..jpg (Source: Présentation de la conférence sur les logiciels système)

  1. threading Dans un système d'exploitation basé sur Unix, les fonctions d'appel système suivantes peuvent être principalement utilisées autour des threads suivants.
une fonction La description
start() Démarrer un fil
setName() Donnez un nom au fil
getName() Obtenez le nom du fil
setDaemon(True) FildémonÀ
join() Attendez que le thread termine le traitement
run() Exécuter manuellement le traitement des threads

Les threads Python ne sont pas simulés par des processus, ce sont de vrais [threads POSIX](https://ja.wikipedia.org/wiki/POSIX%E3%82%B9%E3%83%AC%E3%83% 83% E3% 83% 89). Depuis la bibliothèque standard, vous pouvez utiliser deux modules, _thread </ code> et threading </ code>. Et _thread </ code> est un module de bas niveau, et threading </ code> est un module qui l'encapsule. J'utilise donc généralement threading </ code>.

1-1. Instanciation

Vous pouvez démarrer un thread en introduisant une fonction, etc., en créant une instance de Thread </ code> et en le démarrant avec start </ code>.

import threading
import time


def run(n):
    # threading.current_thread().le nom est getName()Appel
    print("task: {} (thread name: {})".format(n, threading.current_thread().name))
    time.sleep(1)
    print('2s')
    time.sleep(1)
    print('1s')
    time.sleep(1)
    print('0s')
    time.sleep(1)


t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",), name='Thread T2') #Ici setName()Est appelé
# start()
t1.start()
t2.start()
# join()
t1.join()
t2.join()
# join()Parce que j'ai appelé
#Le thread principal attend que le thread ci-dessus se termine
#Imprimez quand tout est fait
print(threading.current_thread().name)

Résultat de l'exécution:

task: t1 (thread name: Thread-1)
task: t2 (thread name: Thread T2)
2s
2s
1s
1s
0s
0s
MainThread

Vous pouvez voir que t1 et t2 sont exécutés en alternance. Une des règles d'alternance sera expliquée plus en détail dans GIL après l'opération IO (où l'opération print </ code> s'applique).

1-2. Personnaliser

Il est également possible d'hériter de Thread </ code> et de personnaliser la méthode run </ code> de la classe de thread.

import threading
import time


class MyThread(threading.Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()
        self.n = n

    # run()Récrire
    def run(self):
        print("task: {}".format(self.n))
        time.sleep(1)
        print('2s')
        time.sleep(1)
        print('1s')
        time.sleep(1)
        print('0s')
        time.sleep(1)


t1 = MyThread("t1")
t2 = MyThread("t2")

t1.start()
t2.start()

Résultat de l'exécution:

task: t1
task: t2
2s
2s
1s
1s
0s
0s

1-3. Calculez le nombre de threads

Vous pouvez compter le nombre de threads actifs avec active_count </ code>. Toutefois, dans un environnement REPL, il existe plusieurs threads à surveiller, de sorte que le nombre de threads sera plus élevé que prévu.

Exécutez le code suivant dans un script.

import threading
import time


def run(n):
    print("task: {}".format(n))
    time.sleep(1)


for i in range(3):
    t = threading.Thread(target=run, args=("t{}".format(i),))
    t.start()

time.sleep(0.5)
print(threading.active_count())

Résultat de l'exécution:

task: t0
task: t1
task: t2
4

Lorsque le thread principal print </ code> est exécuté, le nombre de threads = 3 + 1 (thread principal) car d'autres threads sont toujours en cours d'exécution.

import threading
import time


def run(n):
    print("task: {}".format(n))
    time.sleep(0.5)


for i in range(3):
    t = threading.Thread(target=run, args=("t{}".format(i),))
    t.start()

time.sleep(1)
print(threading.active_count())

Résultat de l'exécution:

task: t0
task: t1
task: t2
1

En ajustant le temps d'exécution et en retardant l'impression </ code> du thread principal, le nombre de threads actifs devient 1 uniquement pour le thread principal.

1-4. Fil de démon

Démarrez le thread en tant que démon.

import threading
import time


def run(n):
    print("task: {}".format(n))
    time.sleep(1)
    print('3')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('1')


for i in range(3):
    t = threading.Thread(target=run, args=("t{}".format(i),))
    # setDaemon(True)
    t.setDaemon(True) 
    t.start()

time.sleep(1.5)
print('Le nombre de fils: {}'.format(threading.active_count()))

Résultat de l'exécution:

task: t0
task: t1
task: t2
3
3
3
Le nombre de fils: 4

Puisque t1, t2 et t3 sont définis comme thread démon du thread principal, ils s'arrêtent lorsque le thread principal se termine. Par exemple, la vérification orthographique de Word est un thread démon, qui s'exécute dans une boucle infinie, mais lorsque le thread principal tombe en panne, il s'arrête. 1-5. GIL Lors de l'utilisation d'un processeur multicœur dans d'autres langages de programmation, les threads avec le nombre de cœurs peuvent être exécutés en même temps. Cependant, en Python, un seul thread s'exécute à la fois, ce qui est un processus. En d'autres termes, le multithreading Python est complètement parallèle. La raison est [GIL (Global Interpreter Lock)](https://ja.wikipedia.org/wiki/%E3%82%B0%E3%83%AD%E3%83%BC%E3%83%90%E3 % 83% AB% E3% 82% A4% E3% 83% B3% E3% 82% BF% E3% 83% 97% E3% 83% AA% E3% 82% BF% E3% 83% AD% E3% 83 Il est situé à% 83% E3% 82% AF).

GIL est un type de contrôle exclusif (expliqué plus loin). Lorsque Python a été conçu pour la première fois, nous avons implémenté GIL pour le rendre plus facile à combiner avec la sécurité des données et les bibliothèques de langage C. Lorsque vous exécutez un thread, vous devez obtenir un GIL. Il n'y a qu'un seul processus Python dans un interpréteur Python. Et comme il n'y a qu'un seul GIL dans un processus Python, un seul thread peut être exécuté à la fois. GIL est comme un passeport, et les threads qui n'ont pas de GIL ne peuvent pas entrer dans le CPU. Au fait, GIL est en CPython (distribution Python normale), mais pas en PyPy et Jython. Un autre langage bien connu avec GIL est Ruby.

1-5-1. Procédure multithreading en CPython

  1. Obtenez des ressources
  2. Demande de GIL
  3. L'interpréteur Python achète des threads natifs du système d'exploitation
  4. Le système d'exploitation utilise le processeur pour calculer
  5. Une fois les règles de collecte des GIL respectées, les GIL seront collectés pour voir si le calcul est terminé.
  6. Un autre fil répète les étapes ci-dessus
  7. Lorsque le GIL revient, traitez la suite du précédent jusqu'à ce que la règle de récupération GIL soit à nouveau remplie (changement de contexte).

1-5-2. Différentes versions des règles de récupération GIL

  • Python 2.X
  • Collecte lorsque l'opération IO se produit
  • Collecter lorsque les tiques atteignent 100
  • les ticks sont des compteurs pour GIL qui enregistrent le nombre de processus virtuels Python
  • Lorsqu'il atteint 100, GIL est collecté et remis à 0.
  • Vous pouvez définir le seuil avec sys.setcheckinterval </ code>
  • Python 3.X
  • les tiques ont été rejetées
  • Mesurer le temps avec une minuterie et recueillir lorsque le seuil est dépassé

À titre expérimental, exécutons une simple boucle infinie.

import threading
import multiprocessing


def loop():
    x = 0
    while True:
        x = x ^ 1


for i in range(multiprocessing.cpu_count()):
    t = threading.Thread(target=loop)
    t.start()
Screen Shot 2020-02-26 at 19.02.51.png

Comme vous pouvez le voir, à cause de GIL, l'utilisation du processeur n'est que d'environ 100% en un seul processus, peu importe vos efforts (il devrait être disponible jusqu'à 400% sur un processeur quad core).

1-5-3. Efficacité de calcul des programmes Python pour différents types de tâches

  • Tâche liée au processeur
  • Après un certain laps de temps, le GIL sera collecté et le fil sera changé, ce qui augmentera le coût de calcul et ralentira.
  • Tâche liée IO
  • Basculez les threads chaque fois qu'une opération IO est effectuée. Il est efficace car il peut être utilisé pour d'autres traitements sans attendre la lecture et l'écriture lentes des fichiers.
  • Si vous souhaitez tirer le meilleur parti de votre processeur multicœur, le multitraitement est recommandé. Chaque processus a son propre GIL.

1-6. Contrôle du fil

Les ressources sont partagées entre les threads dans le même processus. Et comme les threads sont commutés au hasard et dans le désordre, les données peuvent être perturbées.

import threading


#Économiser de l'argent
balance = 0


def change_it(n):
    #Le retrait et le dépôt doivent être de 0
    global balance
    balance = balance + n
    balance = balance - n


def run_thread(n):
    for i in range(100000):
        change_it(n)


t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

Comme vous pouvez le voir en exécutant le code ci-dessus plusieurs fois, le résultat est différent de zéro.

balance = balance + n </ code> peut être divisé en deux opérations atomiques.

x = balance + n
balance = x

Le x </ code> ici est une variable locale, et chaque thread a son propre x </ code>. Lorsque le code ci-dessus est exécuté dans l'ordre, il devient comme suit.

balance = 0 #valeur initiale

t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t1: balance = x1     # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1     # balance = 0

t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2     # balance = 8
t2: x2 = balance - 8 # x2 = 8 - 8 = 0
t2: balance = x2     # balance = 0
    
balance = 0 #Le résultat est correct

Cependant, si l'ordre est différent, le résultat sera différent.

balance = 0 #valeur initiale

t1: x1 = balance + 5  # x1 = 0 + 5 = 5

t2: x2 = balance + 8  # x2 = 0 + 8 = 8
t2: balance = x2      # balance = 8

t1: balance = x1      # balance = 5
t1: x1 = balance - 5  # x1 = 5 - 5 = 0
t1: balance = x1      # balance = 0

t2: x2 = balance - 8  # x2 = 0 - 8 = -8
t2: balance = x2   # balance = -8

balance = -8 #Le résultat est faux

Ce phénomène de calcul imprévisible entraîne le multithreading est appelé Thread-unsafe.

Pour résoudre ce problème, vous devez verrouiller et contrôler le thread.

1-6-1. Contrôle exclusif (mutex)

import threading


#Économiser de l'argent
balance = 0


def change_it(n):
    #Obtenez la serrure
    lock.acquire()
    global balance
    balance = balance + n
    balance = balance - n
    #Relâchez le verrou
    lock.release()


def run_thread(n):
    for i in range(100000):
        change_it(n)


lock = threading.Lock()  #Instancier un verrou

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

En utilisant le contrôle exclusif, aucun autre thread ne peut accéder à la ressource tant que le verrou n'est pas libéré. En faisant cela, le résultat du calcul sera toujours 0.

1-6-2. Contrôle exclusif récursif

Contrôle exclusif qui peut libérer de manière récursive les verrous imbriqués.

import threading


#Économiser de l'argent
balance = 0


def add_it(n):
    lock.acquire()
    global balance
    balance = balance + n
    return balance


def sub_it(n):
    lock.acquire()
    global balance
    balance = balance - n
    return balance


def change_it(n):
    #Obtenez la serrure
    lock.acquire()
    global balance
    balance = add_it(n)
    balance = sub_it(n)
    #Verrouillage de libération récursive
    lock.release()


def run_thread(n):
    for i in range(1000):
        change_it(n)


lock = threading.RLock()  #Instancier un verrou

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

Ici, le verrou est également acquis dans add_it </ code> et sub_it </ code>. En utilisant le contrôle exclusif récursif, il n'est pas nécessaire de déverrouiller chaque verrou, et tout peut être libéré en un seul coup. Cependant, il est très coûteux en calcul, nous réduisons donc le nombre de boucles.

1-6-3. Contrôle BoundedSemaphore

Avec un contrôle exclusif, les ressources ne peuvent être traitées que par un seul thread à un certain moment, tandis que le sémapho est une limite qui permet le traitement simultané d'un certain nombre de threads. Par exemple, une situation où il y a trois sièges de toilette dans les toilettes, trois personnes les utilisent en même temps et d'autres font la queue est un sémapho.

import threading
import time


def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("current thread: {}\n".format(n))
    semaphore.release()


semaphore = threading.BoundedSemaphore(5)  #Permet le traitement simultané de 5 threads

for i in range(22):
    t = threading.Thread(target=run, args=("t-{}".format(i),))
    t.start()

while threading.active_count() != 1:
    pass  # print threading.active_count()
else:
    print('-----Tous les fils sont terminés-----')

Lorsque vous exécutez le code ci-dessus, vous pouvez voir que les chaînes de thread actuelles sont sorties cinq par cinq.

1-6-4. Contrôle des événements

Les événements de thread permettent au thread principal de contrôler d'autres threads. Les méthodes suivantes sont fournies pour Event </ code>.

Méthode La description
clear définir l'indicateur sur False
set définir l'indicateur sur True
is_set Renvoie True lorsque l'indicateur est True
wait Continuez à surveiller le drapeau; blocage lorsque le drapeau est faux
import threading
import time

event = threading.Event()


def lighter():
    '''
    flag=True:Lumière verte
    flag=False:lumière rouge
    '''
    count = 0
    event.set()  #La valeur initiale est la lumière verte
    while True:
        if 5 < count <= 10:
            event.clear()  #Fais une lumière rouge
            print("\33[41;1m lumière rouge...\033[0m")
        elif count > 10:
            event.set()  #Fais un feu vert
            count = 0
        else:
            print("\33[42;1m feu vert...\033[0m")

        time.sleep(1)
        count += 1


def car(name):
    while True:
        if event.is_set():  #Vérifiez si le voyant vert est
            print("[{}]Avance...".format(name))
            time.sleep(1)
        else:
            print("[{}]Attendez le signal à cause du feu rouge...".format(name))
            event.wait()
            # flag=Bloquer ici jusqu'à ce que True
            print("[{}]Commencez à avancer en raison du feu vert...".format(name))


light = threading.Thread(target=lighter,)
light.start()

car = threading.Thread(target=car, args=("MINI",))
car.start()

Avec le code ci-dessus, nous avons réalisé une communication simple entre le signal et le fil de la voiture lors de l'événement.

1-6-5. Contrôle de la minuterie

Vous pouvez également utiliser une minuterie pour contrôler le thread par heure.

from threading import Timer


def hello():
    print("hello, world")


t = Timer(1, hello)
t.start()  #Hello est exécuté après 1 seconde

1-6-6. Contrôle des conditions

Il existe également une méthode pour contrôler le thread en jugeant la condition. Les méthodes suivantes sont fournies dans Condition </ code>.

Méthode La description
wait Suspendre le fil jusqu'à ce qu'il soit notifié ou lorsque le délai d'expiration de l'argument est atteint
notify Fil suspendu (par défaut n=Notifier 1); ne peut être utilisé qu'avec le verrou acquis
notifyAll Notifier tous les threads bloqués
import threading
import time
from random import randint
from collections import deque


class Producer(threading.Thread):
    def run(self):
        global stocks
        while True:
            if lock_con.acquire():
                products = [randint(0, 100) for _ in range(5)]
                stocks.extend(products)
                print('Producteur{}Est{}Produit.'.format(self.name, stocks))
                lock_con.notify()
                lock_con.release()
            time.sleep(3)


class Consumer(threading.Thread):
    def run(self):
        global stocks
        while True:
            lock_con.acquire()
            if len(stocks) == 0:
                #Attendez qu'il soit produit lorsque le produit est épuisé
                #Suspendre le fil jusqu'à ce qu'il ne soit pas confirmé
                lock_con.wait()
            print('Client{}Est{}Acheté. Stock: {}'.format(self.name, stocks.popleft(), stocks))
            lock_con.release()
            time.sleep(0.5)


stocks = deque()
lock_con = threading.Condition()
p = Producer()
c = Consumer()
p.start()
c.start()

Résultat de l'exécution:

Fil du producteur-1 est deque([73, 2, 93, 52, 21])Produit.
Fil client-2 achetés 73. Stock: deque([2, 93, 52, 21])
Fil client-2 achetés 2. Stock: deque([93, 52, 21])
Fil client-2 achetés 93. Stock: deque([52, 21])
Fil client-2 achetés 52. Stock: deque([21])
Fil client-2 achetés 21. Stock: deque([])
Fil du producteur-1 est deque([6, 42, 85, 56, 76])Produit.
Fil client-2 achetés 6. Stock: deque([42, 85, 56, 76])
Fil client-2 achetés 42. Stock: deque([85, 56, 76])
Fil client-2 achetés 85. Stock: deque([56, 76])
Fil client-2 achetés 56. Stock: deque([76])
Fil client-2 achetés 76. Stock: deque([])

C'est un programme simple dans lequel le producteur fabrique 5 produits lorsque le client achète tout le stock.

1-6-7. Contrôle des barrières

C'est un contrôle qui est exécuté collectivement lorsque le nombre spécifié de threads passe à travers la barrière. Par exemple, dans un jeu de match en ligne, vous pouvez implémenter une barrière qui attend pendant un certain temps jusqu'à ce que l'équipe atteigne un nombre spécifié de personnes. Les méthodes suivantes sont fournies dans Barrière </ code>.

Méthode La description
wait Les threads traversent la barrière; une fois que le nombre spécifié de threads est passé, tous les threads en attente sont libérés
reset Vider la barrière; renvoyer BrokenBarrierError au thread en attente
abort A brisé la barrière pour rompre l'état; tous les threads actuels sont terminés; retourne BrokenBarrierError aux threads qui tentent de passer à travers la barrière après cela
import threading

num = 4


def start():
    print('{}Depuis que je suis devenu une personne, le jeu a commencé.'.format(num))


lock = threading.Lock()
barrier = threading.Barrier(num, action=start)


class Player(threading.Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def run(self):
        try:
            if not barrier.broken:
                print('{}Participé.'.format(self.name))
                barrier.wait(2)
        except threading.BrokenBarrierError:
            print('Parce que le jeu ne peut pas démarrer{}Est parti.'.format(self.name))


players = []
for i in range(10):
    lock = threading.Lock()
    p = Player(name='Player {}'.format(i))
    players.append(p)

for p in players:
    p.start()

Résultat d'exécution

Le joueur 0 a participé.
Le joueur 1 a participé.
Le joueur 2 a participé.
Le joueur 3 a participé.
Le jeu a commencé parce qu'il y avait 4 personnes.
Le joueur 4 a participé.
Le joueur 5 a participé.
Le joueur 6 a participé.
Le joueur 7 a participé.
Le jeu a commencé parce qu'il y avait 4 personnes.
Le joueur 8 a participé.
Le joueur 9 a participé.
Le joueur 8 est parti car le jeu ne peut pas démarrer.
Le joueur 9 est parti car le jeu ne peut pas démarrer.

Les threads sont exécutés de manière aléatoire, ils ne sont donc pas toujours sortis dans l'ordre ci-dessus. Ici, les équipes Joueur 8 et Joueur 9 (barrières) ont été forcées de partir (BrokenBarrierError) car elles n'ont pas atteint le nombre spécifié à temps.

1-7. ThreadLocal J'ai expliqué que parce que les données entre les threads sont partagées, vous devez les verrouiller pour calculer la sortie exacte. Cependant, il arrive que vous souhaitiez que chaque thread traite ses propres variables locales.

import threading


#Créer un objet ThreadLocal dans une portée globale
local_school = threading.local()

def process_student():
    #Gagnez des étudiants liés au fil actuel
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))

def process_thread(name):
    #Lier le nom à l'élève dans ThreadLocal
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

Résultat de l'exécution:

Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)

Le local_school </ code> est ici une variable globale, mais comme il s'agit d'un objet ThreadLocal </ code>, la variable d'instance student </ code> peut être définie sans affecter l'autre de chaque thread. Vous pouvez le faire fonctionner. Vous pouvez regarder local_school </ code> comme un dictionnaire et lier enseignant </ code> ainsi que étudiant </ code>. Et chaque thread peut fonctionner de manière arbitraire et ne s’affecte pas. En tant qu'utilisation de ThreadLocal </ code>, vous pouvez créer votre propre connexion DB, requête http, etc. pour chaque thread. Du point de vue d'un thread, toutes les données reçues sont comme une variable locale et peuvent être manipulées par n'importe quel autre thread.

  1. multiprocessing Sur un système d'exploitation basé sur Unix, vous pouvez créer un processus avec l'appel système fork () </ code>. L'appel de fork () </ code> copiera le processus en cours. Le processus copié est appelé processus enfant et le processus d'origine devient son processus parent. La valeur de retour de fork () </ code> est renvoyée à la fois au processus enfant et au processus parent. Et la valeur de retour du processus enfant est 0, et l'ID du processus enfant est retourné dans le processus parent. La raison en est que le processus parent doit enregistrer l'ID du processus enfant. Vous pouvez obtenir l'ID du processus parent du processus enfant avec getppid </ code>.

Le module Python OS </ code> encapsule le système d'appel système.

import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

Résultat de l'exécution:

Process (19148) start...
I (19148) just created a child process (19149).
I am child process (19149) and my parent is 19148.

Ici, le processus parent et le processus enfant entrent dans des branches conditionnelles différentes. Veuillez noter que Windows n'a pas d'appel système fork () </ code> et ne peut pas être exécuté.

En utilisant fork () </ code>, lorsqu'un processus prend une nouvelle tâche, un nouveau processus peut être créé et traité. Par exemple, le serveur Apache bien connu permet au processus parent de surveiller le port et fork () </ code> pour laisser le processus enfant gérer les nouvelles requêtes http.

Lors de l'écriture de programmes multi-processus Python, il est recommandé d'utiliser le module multiprocessing </ code> de la bibliothèque standard. Le module multiprocessing </ code> est un module qui peut être traité en parallèle. On dit aussi que le module multiprocessing </ code> a été implémenté car le module threading </ code> ne peut pas être traité en parallèle à cause de GIL.

Le module multiprocessing </ code> est également multiplateforme, vous permettant de créer des programmes multi-processus sous Windows. Comme mentionné ci-dessus, Windows n'a pas de fork () </ code>, donc lors de la création d'un processus avec le module multiprocessing </ code>, un pseudo fork () </ code> Je traite. La façon de faire est de sérialiser tous les objets Python dans le processus parent avec Pickle </ code> et de les transmettre au processus enfant. Ainsi, si l'appel au module multiprocessing </ code> échoue sous Windows, le Pickle </ code> peut avoir échoué.

Si vous souhaitez créer un processus enfant et exécuter une commande externe, vous pouvez utiliser le sous-processus </ code> de la bibliothèque standard, mais ici, d'abord, le traitement Python est effectué dans le module multi-processus multiprocessing </ code>. Je vais vous présenter la fonction.

2-1. Processus

Vous pouvez facilement créer des processus enfants à l'aide de processus.

from multiprocessing import Process
import os


#Ce que fait le processus enfant
def run_proc(name):
    print('Run child process {} ({})...'.format(name, os.getpid()))


print('Parent process {}.'.format(os.getpid()))
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')

Résultat de l'exécution:

Parent process 19218.
Child process will start.
Run child process test (19219)...
Child process end.

Passez la fonction d'exécution et les arguments à Process </ code>, créez une instance et démarrez-la avec start </ code>. Vous pouvez facilement créer un processus enfant à partir de fork () </ code>. En utilisant join </ code> ici, le processus parent attend que le processus enfant finisse de s'exécuter, tout comme lorsqu'il s'agit d'un thread.

2-2. Pool de processus

La création d'un processus enfant est très coûteuse en calcul, donc si vous souhaitez créer un grand nombre de processus, il est plus efficace de créer un pool de processus avec Pool </ code>. Les principales méthodes de Pool </ code> sont les suivantes.

Méthode La description
apply Traitement synchrone
apply_async Traitement asynchrone
terminate Quittez immédiatement
join Le processus parent attend la fin du traitement du processus enfant; la jointure de processus ne peut être effectuée qu'après la fermeture ou la fin
close Quitter lorsque tous les processus sont terminés
from multiprocessing import Pool
import os
import time
import random


def long_time_task(name):
    print('Run task {} ({})...'.format(name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task {} runs {} seconds.'.format(name, (end - start)))


print('Parent process {}.'.format(os.getpid()))
p = Pool(4)  #Jusqu'à 4 processus enfants en même temps
for i in range(5):
    p.apply_async(long_time_task, args=(i,))
#En raison du traitement asynchrone, le processus parent n'a pas à attendre le traitement du processus enfant.
#Faites la prochaine impression
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')

Résultat de l'exécution:

Parent process 19348.
Waiting for all subprocesses done...
Run task 0 (19349)...
Run task 1 (19350)...
Run task 2 (19351)...
Run task 3 (19352)...
Task 1 runs 0.8950300216674805 seconds.
Run task 4 (19350)...
Task 2 runs 1.0132842063903809 seconds.
Task 4 runs 0.3936619758605957 seconds.
Task 3 runs 2.3689510822296143 seconds.
Task 0 runs 2.776203155517578 seconds.
All subprocesses done.

Étant donné que la taille du pool est de 4, la tâche 4 commencera à s'exécuter après la fin de l'une des tâches 0 à 3.

2-3. Communication inter-processus

Contrairement aux threads, les données ne sont pas partagées entre les processus. Le système d'exploitation propose de nombreuses méthodes de communication inter-processus. multiprocessing </ code> encapsule les fonctionnalités de bas niveau du système d'exploitation pour une facilité d'utilisation.

2-3-1. File d'attente

Les files d'attente de structure de données FIFO sont souvent utilisées pour la communication inter-processus.

from multiprocessing import Process, Queue
import os
import time
import random


#Ecrire des données dans la file d'attente
def write(q):
    print('Process to write: {}'.format(os.getpid()))
    for value in ['A', 'B', 'C']:
        print('Put {} to queue...'.format(value))
        q.put(value)
        time.sleep(random.random())


#Lire les données de la file d'attente
def read(q):
    print('Process to read: {}'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('Get {} from queue.'.format(value))


#Le processus parent crée une file d'attente et la transmet au processus enfant
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
#Démarrez pw et commencez à écrire
pw.start()
#Commencez à lire et commencez à lire
pr.start()
#Attendez que pw se termine
pw.join()
#pr est une boucle infinie, alors tuez
pr.terminate()

Résultat de l'exécution:

Process to write: 19489
Put A to queue...
Process to read: 19490
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

Même si la lecture est lente, elle peut être récupérée dans le bon ordre car il s'agit d'un FIFO.

2-3-2. Tuyau

Comme son nom l'indique, les tuyaux peuvent être considérés comme des structures de données en forme de tuyaux. Les données sont transmises en plaçant les données d'un côté du tube (méthode send </ code>) et en recevant les données de l'autre côté (méthode recv </ code>). Veuillez noter que les données peuvent être corrompues si deux processus placent ou reçoivent des données du même type en même temps.

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()


parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()

Résultat de l'exécution:

[42, None, 'hello']

2-3-3. Mémoire partagée

J'ai expliqué que les données entre les processus ne sont pas partagées, mais c'est en fait un mensonge ... En fonction du système d'exploitation, [mémoire partagée] entre les processus (https://ja.wikipedia.org/wiki/%E5%85%B1%E6%9C%89%E3%83%A1%E3%83%A2% E3% 83% AA #% E3% 82% BD% E3% 83% 95% E3% 83% 88% E3% 82% A6% E3% 82% A7% E3% 82% A2% E3% 81% AB% E3 % 82% 88% E3% 82% 8B% E5% 85% B1% E6% 9C% 89% E3% 83% A1% E3% 83% A2% E3% 83% AA) peuvent être réalisés. En Python, Value </ code> et Array </ code> vous permettent de stocker des données numériques et des dates de tableau dans la mémoire partagée. En passant, Value </ code> et Array </ code> utilisent la structure de données du langage C telle quelle. Les [nombres (héritant de la classe des nombres)] de Python (https://docs.python.org/ja/3/library/numbers.html) sont immuables et ne peuvent pas être réécrits directement. ..

from multiprocessing import Process, Value, Array


def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]


num = Value('d', 0.0)  #numéro de type double
arr = Array('i', range(10))  #Tableau

p = Process(target=f, args=(num, arr))
p.start()
p.join()

print(num.value)
print(arr[:])

Résultat de l'exécution:

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
    • Le module Multiprocessing.shared_memory a été ajouté à partir de python 3.8. Je mettrai à jour lorsque miniconda sera mis à jour. *

2-3-4. Gestionnaire

Il peut être plus exact de dire que le gestionnaire partage les données plutôt que de les transmettre. Manager () </ code> renvoie un objet gestionnaire et crée un processus serveur. Grâce au processus serveur, d'autres processus peuvent travailler avec des objets Python de manière proxy. Les objets Manager prennent en charge les objets Python list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array </ code>.

from multiprocessing import Process, Manager


def f(d, l, i):
    d[i] = i
    d[str(i)] = str(i)
    l.append(i)
    print(l)


with Manager() as manager:
    shared_dict = manager.dict()
    shared_list = manager.list()
    p_list = []
    #Créer 10 processus
    for i in range(10):
        p = Process(target=f, args=(shared_dict, shared_list, i))
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()

    print('All subprocesses done.')
    print(shared_dict)
    print(shared_list)

Résultat de l'exécution:

[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4, 5]
[0, 1, 2, 3, 4, 5, 6]
[0, 1, 2, 3, 4, 5, 6, 8]
[0, 1, 2, 3, 4, 5, 6, 8, 7]
[0, 1, 2, 3, 4, 5, 6, 8, 7, 9]
All subprocesses done.
{0: 0, '0': '0', 1: 1, '1': '1', 2: 2, '2': '2', 3: 3, '3': '3', 4: 4, '4': '4', 5: 5, '5': '5', 6: 6, '6': '6', 8: 8, '8': '8', 7: 7, '7': '7', 9: 9, '9': '9'}
[0, 1, 2, 3, 4, 5, 6, 8, 7, 9]

J'ai essayé de créer une liste et un dictionnaire de partage inter-processus dans le gestionnaire. Ici, vous pouvez voir que les processus ne sont pas en séquence.

2-3-5. Traitement du verrouillage de processus

Comme les threads, les processus ont des verrous.

from multiprocessing import Process, Lock


def f(i):
    lock.acquire()
    try:
        print('hello world', i)
    finally:
        lock.release()


lock = Lock()

for num in range(10):
    Process(target=f, args=(num,)).start()

Résultat de l'exécution:

hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9

En raison du verrouillage, les nombres sont affichés dans l'ordre contrairement à la fois précédente. Cependant, vous ne pourrez pas démontrer les performances du multi-processus.

2-4. Traitement des processus distribués

Les processus Python peuvent être un traitement de processus distribué utilisant plusieurs machines. Le sous-module managers </ code> du module multiprocessing </ code> peut répartir les processus sur plusieurs machines. Même si vous ne connaissez pas le protocole de communication, vous pouvez écrire un programme pour le traitement de processus distribué.

Le traitement de processus distribué nécessite un processus serveur qui distribue les tâches et un processus de travail qui traite réellement les tâches. Tout d'abord, implémentez le processus serveur task_master.py </ code>.

Ici, managers </ code> publie la file d'attente sur Internet en tant que ** api **. Une fois que le processus serveur a démarré la file d'attente et mis la tâche, il est accessible à partir d'autres machines.

task_master.py


import random
import queue  #Comme c'est via le net, la file d'attente standard de la bibliothèque est suffisante
from multiprocessing.managers import BaseManager


#File d'attente pour envoyer des tâches
task_queue = queue.Queue()
#File d'attente pour recevoir les résultats
result_queue = queue.Queue()


class QueueManager(BaseManager):
    pass


#Enregistrer deux files d'attente en tant qu'API
#Dans le cas de Windows, lambda peut être utilisé pour l'enregistrement de l'API, veuillez donc définir la fonction de manière obéissante
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)

#Utilisez le port 5000 pour le chiffrement d'authentification'abc'À
#Pour Windows, l'adresse doit être spécifiée (127).0.0.1)
manager = QueueManager(address=('', 5000), authkey=b'abc')
#commencer
manager.start()
#Obtenir un objet de file d'attente via net
task = manager.get_task_queue()
result = manager.get_result_queue()

#Essayez de mettre dans une tâche
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task {}...'.format(n))
    task.put(n)

#recevoir le résultat de la file d'attente des résultats
print('Try get results...')
for i in range(10):
    #S'il dépasse 10 secondes, il se termine avec un délai d'expiration
    r = result.get(timeout=10)
    print('Result: {}'.format(r))

#Fin
manager.shutdown()
print('master exit.')

Ensuite, implémentez le task_worker.py </ code> pour le processus de travail. Obtenez la tâche avec ** api ** appelée manager.get_task_queue </ code> publiée ci-dessus et traitez-la.

task_worker.py


import time
import queue
from multiprocessing.managers import BaseManager


#Créer le même gestionnaire de files d'attente
class QueueManager(BaseManager):
    pass


#Obtenez l'API sur le net et enregistrez-la dans le gestionnaire de files d'attente
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

#Connectez-vous au serveur
server_addr = '127.0.0.1'
print('Connect to server {}...'.format(server_addr))
#Définissez le même port et le même cryptage d'authentification
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
#Lien
m.connect()

#Obtenez chaque file d'attente
task = m.get_task_queue()
result = m.get_result_queue()

#Recevoir une tâche de la file d'attente des tâches
#Stocker le résultat du traitement dans la file d'attente des résultats
for i in range(10):
    try:
        n = task.get(timeout=1)
        #Ici, la tâche est un simple calcul carré.
        print('run task {} * {}...'.format(n, n))
        r = '{} * {} = {}'.format(n, n, n*n)
        time.sleep(1)
        result.put(r)
    except queue.Empty:
        print('task queue is empty.')

#Fin
print('worker exit.')

Il peut également être exécuté sur une machine locale.

Résultat de l'exécution: Tout d'abord, le processus serveur place d'abord la tâche dans la task_queue </ code>. Une fois que tout est entré, attendez les résultats dans result_queue </ code>.

task_master.py


Put task 7710...
Put task 6743...
Put task 8458...
Put task 2439...
Put task 1351...
Put task 9885...
Put task 5532...
Put task 4181...
Put task 6093...
Put task 3815...
Try get results...

Le processus de travail se connecte ensuite au serveur, récupère la tâche dans la task_queue </ code> et la traite. Le résultat du traitement est envoyé à result_queue </ code>.

task_worker.py


Connect to server 127.0.0.1...
run task 7710 * 7710...
run task 6743 * 6743...
run task 8458 * 8458...
run task 2439 * 2439...
run task 1351 * 1351...
run task 9885 * 9885...
run task 5532 * 5532...
run task 4181 * 4181...
run task 6093 * 6093...
run task 3815 * 3815...
worker exit.

Lorsque le résultat arrive dans result_queue </ code>, le processus serveur le sort dans l'ordre.

task_master.py


Put task 7710...
Put task 6743...
Put task 8458...
Put task 2439...
Put task 1351...
Put task 9885...
Put task 5532...
Put task 4181...
Put task 6093...
Put task 3815...
Try get results...
Result: 7710 * 7710 = 59444100
Result: 6743 * 6743 = 45468049
Result: 8458 * 8458 = 71537764
Result: 2439 * 2439 = 5948721
Result: 1351 * 1351 = 1825201
Result: 9885 * 9885 = 97713225
Result: 5532 * 5532 = 30603024
Result: 4181 * 4181 = 17480761
Result: 6093 * 6093 = 37124649
Result: 3815 * 3815 = 14554225
master exit.

Toutes les files d'attente sont dans le processus serveur car le processus de travail ne crée pas la file d'attente.

Screen Shot 2020-02-27 at 0.58.21.png (Source: [réseau gouvernemental de type Hiroyukimine](https://www.liaoxuefeng.com/wiki/1016959663602400/1017631559645600))

De cette manière, les processus distribués peuvent être réalisés en Python. Une puissance de calcul puissante peut être obtenue en utilisant plusieurs travailleurs.

  1. subprocess Dans le système d'exploitation basé sur Unix, fork () </ code> a été expliqué pour faire une copie du processus actuel en tant que processus enfant. Autrement dit, appeler os.fork </ code> en Python crée un processus enfant de votre programme Python. Cependant, il y a des moments où vous avez besoin d'un processus enfant capable d'exécuter des commandes externes plutôt qu'un programme Python. Il existe un autre appel système exec () </ code> dans le système d'exploitation basé sur Unix. Il est implémenté en tant que os.execve </ code> en Python. exec () </ code> est une fonction qui remplace actuellement le processus par un autre programme. C'est-à-dire que os.fork </ code> crée un processus enfant d'un programme Python, et os.execve </ code> utilise d'autres programmes ( ls </ code>, ls </ code>, qui peuvent être exécutés dans le shell. Il peut être remplacé par un programme comme code> ping </ code>. Le sous-processus </ code> de la bibliothèque standard est un module permettant de créer des processus enfants qui exécutent des programmes externes. Ensuite, lors de l'exécution d'un programme externe avec sous-processus </ code>, créez un tube (Pipe) pour la communication inter-processus entre le processus Python et le processus enfant, transmettez les paramètres et envoyez les valeurs de retour et les erreurs. Vous pourrez le recevoir.

3-1. subprocess.run À partir de Python 3.5, il est officiellement recommandé d'exécuter la commande dans subprocess.run </ code>. Ici, l'explication telle que subprocess.call </ code> de l'ancienne ** api ** est omise.

subprocess.run(args, *, stdin=None, input=None, 
    stdout=None, stderr=None, shell=False, timeout=None, check=False, universal_newlines=False)

subprocess.run </ code> renvoie une instance de la classe CompletedProcess </ code>. Les attributs de la classe CompletedProcess </ code> sont les suivants.

attribut La description
args Paramètres passés aux processus enfants; chaîne ou liste
returncode Stocke le code d'état après l'exécution
stdout Sortie standard après exécution
stderr Erreur standard après exécution
check_returncode() Déclenche CalledProcessError lorsque le code d'état est différent de zéro (échec d'exécution)

Voici quelques exemples d'utilisation de subprocess.run </ code>.

Vous pouvez capturer la sortie standard avec subprocess.PIPE </ code> (sinon la sortie sera supprimée).

import subprocess


# subprocess.run(["ls", "-l"] stdout=subprocess.PIPE)Pareil que
obj = subprocess.run(["ls", "-l"], stdout=subprocess.PIPE)
print('stdout:\n{}'.format(obj.stdout.decode()))

Résultat de l'exécution:

stdout:
total 128
-rw-r--r--@ 1 kaito  staff   692 Feb 16 19:35 1-1.py
-rw-r--r--@ 1 kaito  staff   509 Feb 17 23:39 1-2.py
-rw-r--r--@ 1 kaito  staff   364 Feb 19 16:48 2-10.py
-rw-r--r--@ 1 kaito  staff   645 Feb 19 19:12 2-17.py
-rw-r--r--@ 1 kaito  staff   213 Feb 19 19:14 2-18.py
-rw-r--r--@ 1 kaito  staff   209 Feb 19 19:18 2-19.py
-rw-r--r--@ 1 kaito  staff   318 Feb 19 23:53 2-20.py
-rw-r--r--@ 1 kaito  staff   194 Feb 19 23:57 2-21.py
-rw-r--r--@ 1 kaito  staff   230 Feb 20 15:46 2-23.py
-rw-r--r--@ 1 kaito  staff   131 Feb 18 19:39 2-4.py
-rw-r--r--@ 1 kaito  staff   543 Feb 18 19:50 2-8.py
-rw-r--r--@ 1 kaito  staff   240 Feb 18 22:29 2-9.py
-rw-r--r--  1 kaito  staff  1339 Feb 27 00:25 task_master.py
-rw-r--r--  1 kaito  staff  1086 Feb 27 00:31 task_worker.py
-rw-r--r--  1 kaito  staff   446 Feb 27 20:26 test.py
-rw-r--r--  1 kaito  staff   199 Feb 27 20:31 test2.py

Si check </ code> est défini sur True, une erreur se produit lorsque le code d'état est différent de zéro.

subprocess.run("exit 1", shell=True, check=True)

Résultat de l'exécution:

Traceback (most recent call last):
  File "test2.py", line 4, in <module>
    subprocess.run("exit 1", shell=True, check=True)
  File "/Users/kaito/opt/miniconda3/lib/python3.7/subprocess.py", line 487, in run
    output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1.

Le \ _ \ _ repr \ _ \ _ </ code> de la classe CompletedProcess </ code> ressemble à ceci.

print(subprocess.run(["ls", "-l", "/dev/null"], stdout=subprocess.PIPE))

Résultat de l'exécution:

CompletedProcess(args=['ls', '-l', '/dev/null'], returncode=0, stdout=b'crw-rw-rw-  1 root  wheel    3,   2 Feb 27 20:37 /dev/null\n')

3-2. subprocess.Popen Pour les opérations avancées, vous pouvez utiliser la classe subprocess.Popen </ code>.

class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, 
    preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False,
    startup_info=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=())

La méthode de la classe subprocess.Popen </ code> est la suivante.

Méthode La description
poll Renvoie un code d'état à la fin de l'exécution du processus enfant; renvoie None s'il n'est pas terminé
wait Attendez la fin de l'exécution du processus enfant; déclenchez une erreur TimeoutExpired lorsque le délai d'expiration se produit
communicate Communiquer avec les processus enfants
send_signal Envoyer un signal à un processus enfant, par exemple un signal.signal(signal.SIGINT)Est la ligne de commande du système d'exploitation UNIX, Ctrl+Signal en appuyant sur C
terminate Terminer le processus enfant
kill Tuer le processus enfant

Voici quelques exemples d'utilisation de subprocess.Popen </ code>.

Vous pouvez exécuter votre code Python en tant que programme externe.

import subprocess


#Connectez les tuyaux aux entrées standard, aux sorties standard, aux erreurs standard
p = subprocess.Popen(["python"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
#Ecrire des données sur l'entrée standard
p.stdin.write(b'print("stdin")\n')
#Passer des données comme entrée pour communiquer
out, err = p.communicate(input=b'print("communicate")\n')
print(out.decode())

Résultat de l'exécution:

stdin
communicate

Le traitement de pipeline à l'aide de | </ code> peut être construit en connectant la sortie standard et l'entrée standard de deux processus enfants avec un tube.

#Conduisez les deux processus enfants ensemble
p1 = subprocess.Popen(['df', '-h'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', 'Data'], stdin=p1.stdout, stdout=subprocess.PIPE)
out, err = p2.communicate()  # df -h | grep Data
print(out.decode())

Résultat de l'exécution:

/dev/disk1s1   466Gi  438Gi  8.0Gi    99% 1156881 4881295959    0%   /System/Volumes/Data
map auto_home    0Bi    0Bi    0Bi   100%       0          0  100%   /System/Volumes/Data/home

  1. concurrent.futures Nous vous avons présenté le multithreading et le multiprocessing de Python. Vous avez peut-être une image un peu compliquée et difficile à comprendre, mais c'est vrai (rires). [Aller](https://ja.wikipedia.org/wiki/Go_(%E3%83%97%E3%83%AD%E3%82%B0%E3%83%A9%E3%83%9F%E3 Les langages avec une philosophie de conception de traitement parallèle / parallèle simple depuis le début, tels que% 83% B3% E3% 82% B0% E8% A8% 80% E8% AA% 9E)), indiquent la direction d'évolution des langages de programmation. C'est possible.

Cependant, l'évolution de Python ne s'est pas encore arrêtée. Un module de haut niveau appelé concurrent </ code> qui encapsule davantage le threading </ code> et le multiprocessing </ code> pour faciliter son utilisation est actuellement en cours de développement.

Le concurrent </ code> actuel n'a qu'un module appelé futures </ code>. futures </ code> est [Modèle futur](https://ja.wikipedia.org/wiki/Future_%E3%83%91%E3%82%BF%E3%83%BC%E3%83% B3) Implémentation Python. Ici, je voudrais présenter les fonctions qui peuvent être utilisées à ce moment.

4-1. Exécuteur testamentaire et avenir

concurrent.futures </ code> fournit ThreadPoolExecutor </ code> et ProcessPoolExecutor </ code>, qui héritent de la classe Executor </ code>. Devenir. ThreadPoolExecutor </ code> et ProcessPoolExecutor </ code> reçoivent un argument appelé max_works </ code> qui spécifie le nombre de threads ou de processus. Effectue une seule tâche avec la méthode submit </ code> et retourne une instance de la classe Future </ code>.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests


def load_url(url):
    return requests.get(url)


if __name__ == '__main__':
    url = 'https://www.python.org/'
    executor = ProcessPoolExecutor(max_workers=4)  # ThreadPoolExecutor(max_workers=4)
    future = executor.submit(load_url, url)
    print(future)
    while 1:
        if future.done():
            print('status code: {}'.format(future.result().status_code))
            break

Résultat de l'exécution:

<Future at 0x10ae058d0 state=running>
status code: 200

Une simple requête http. Notez que lorsque vous utilisez ProcessPoolExecutor </ code>, le module \ _ \ _ main__ </ code> est requis, ne l'exécutez donc pas dans un environnement REPL.

The __main__ module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter.

4-2. Map, as_completed et attendez

La méthode submit </ code> ne peut exécuter qu'une seule tâche, donc si vous souhaitez exécuter plusieurs tâches, map </ code>, as_completed </ code> et wait < Utilisez / code>.

La méthode map </ code> prend une fonction d'exécution et une séquence comme arguments et renvoie un générateur de résultats d'exécution.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests


URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']


def load_url(url):
    return requests.get(url)


if __name__ == '__main__':
    # with ThreadPoolExecutor(max_workers=4) as executor:
    with ProcessPoolExecutor(max_workers=4) as executor:
        for url, data in zip(URLS, executor.map(load_url, URLS)):
            print('{} - status_code {}'.format(url, data.status_code))

Résultat de l'exécution:

https://google.com - status_code 200
https://www.python.org/ - status_code 200
https://api.github.com/ - status_code 200

La méthode as_completed </ code> renvoie un générateur pour l'objet Future </ code>. Et il se bloque lorsque la tâche n'est pas terminée.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import requests


URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']


def load_url(url):
    return url, requests.get(url).status_code


if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as executor:
        tasks = [executor.submit(load_url, url) for url in URLS]
        for future in as_completed(tasks):
            print(*future.result())

Résultat de l'exécution:

https://google.com 200
https://www.python.org/ 200
https://api.github.com/ 200

La méthode wait </ code> bloque le thread principal et le processus principal. Vous pouvez définir trois conditions avec l'argument return_when </ code>.

conditions La description
ALL_COMPLETED Libérez le blocage lorsque toutes les tâches sont terminées
FIRST_COMPLETED Libérer le blocage lorsqu'une tâche est terminée
FIRST_EXCEPTION Libérez le blocage si une tâche provoque une erreur
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, ALL_COMPLETED
import requests


URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']


def load_url(url):
    requests.get(url)
    print(url)


if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as executor:
        tasks = [executor.submit(load_url, url) for url in URLS]
        wait(tasks, return_when=ALL_COMPLETED)
        print('all completed.')  #Après 3 impressions, le processus principal est libéré pour imprimer

Résultat de l'exécution:

https://www.python.org/
https://api.github.com/
https://google.com
all completed.

référence

Exécution parallèle threading --- Threading traitement parallèle multitraitement --- Traitement parallèle basé sur les processus sous-processus --- Gestion des sous-processus concurrent.futures - Exécution de tâches parallèles

Recommended Posts