Introduction au traitement parallèle distribué Python par Ray

Qu'est-ce que Ray

Ray est un framework qui vous permet d'écrire rapidement et simplement un traitement parallèle distribué en Python, et est conçu pour faciliter la parallélisation du code existant. En utilisant Ray, vous pouvez écrire un traitement parallèle au niveau du processus plus facilement que le multitraitement.

Cet article est basé sur le contenu du Tutoriel Ray. Le code a été confirmé pour fonctionner avec Python 3.8.2, Ray 0.8.4.

Installation

Vous pouvez l'installer à partir de pip, etc. dans le terminal.

$ pip install ray

Comment utiliser

Il n'y a que trois utilisations de base à retenir: ray.init`` ray.remote`` ray.get, et cet article présentera également ray.wait ray.put.

Bases de la parallélisation avec Ray

Pensez à paralléliser l'exécution de func pour le code suivant, où la fonction func, qui prend 3 secondes à s'exécuter, est appelée deux fois et l'exécution entière prend 6 secondes.

import time

def func(x):
    time.sleep(3)
    return x

begin_time = time.time() #Heure de début d'enregistrement

res1, res2 = func(1), func(2) #Appeler func deux fois
print(res1, res2)  #production: 1 2

end_time = time.time() #Enregistrez l'heure de fin

print(end_time - begin_time) #Environ 6 secondes

Lors de l'utilisation de Ray, il est nécessaire de ** toujours d'abord ** spécifier le nombre de ressources à utiliser dans ray.init et de démarrer le processus Ray.

import ray

# ray.init()Si vous ne spécifiez pas explicitement comme, le nombre de ressources sera déterminé automatiquement.
ray.init(num_cpus=4)

#Attendez un peu que Ray démarre pour une mesure du temps plus précise
time.sleep(1)

Si vous voulez exécuter une fonction en parallèle, vous devez en faire une ** fonction distante ** que Ray peut gérer. Cela dit, c'est facile à faire, il suffit d'ajouter @ ray.remote et un décorateur à la fonction. La fonction distante peut être appelée comme (nom de la fonction) .remote (argument) et envoyée aux travailleurs parallèles de Ray pour exécution. .remote (argument) renvoie ** Object ID ** sans attendre qu'il se termine.

@ray.remote
def func(x):
    time.sleep(3)
    return x

begin_time = time.time()
res1, res2 = func.remote(1), func.remote(2)
print(res1) #Exemple de sortie: ObjectID(45b9....) 

Si vous voulez obtenir le résultat, vous pouvez passer l'ID d'objet renvoyé par la fonction distante à ray.get. «ray.get» bloque jusqu'à ce que tous les résultats correspondant à l'ID d'objet soient obtenus.

print(ray.get(res1), ray.get(res2)) #production: 1 2

# ray.get peut aussi recevoir une liste
print(ray.get([res1, res2])) #production: [1, 2]

end_time = time.time()
print(end_time - begin_time) #Environ 3 secondes

Lorsque le code ci-dessus est exécuté comme un seul script, cela ne prend qu'environ 3 secondes, et vous pouvez voir que l'exécution de func est parallélisée. Voilà toutes les bases.

Parallélisation dépendante

Ray peut gérer même s'il existe une dépendance entre les fonctions distantes en passant l'ID d'objet tel quel. L'ID d'objet transmis est restauré dans un objet Python normal et exécuté lorsqu'il est réellement exécuté. Dans l'exemple ci-dessous, «func1» et «func2» sont appliqués en séquence à chacun des quatre éléments de «vec». Le traitement d'un élément prend 2 secondes.

@ray.remote
def func1(x):
    time.sleep(1)
    return x * 2

@ray.remote
def func2(x):
    time.sleep(1)
    return x + 1

vec = [1, 2, 3, 4]
results = []

for x in vec:
    res = func1.remote(x)  #objectID est inclus dans res
    res = func2.remote(res) #Passez l'ObjectID tel quel à la fonction distante suivante
    results.append(res)

#results est une liste d'ObjectID
print(ray.get(results)) #production: [3, 5, 7, 9]

Ray analyse les dépendances, lance d'abord func1 sans dépendances, puis exécute func2 en parallèle pour les éléments traités de func1. Ce processus, qui prend 8 secondes séquentiellement, est exécuté en environ 2 secondes par parallélisation.

Ray prend également en charge les appels imbriqués et la réécriture de func2 comme suit fonctionne très bien. La seule condition pour un appel imbriqué est que la fonction que vous souhaitez appeler soit prédéfinie.

@ray.remote
def func2(x):
    x = func1.remote(x)  #ObjectID renvoyé
    time.sleep(1)
    return ray.get(x) + 1 #Parce qu'il ne peut pas être ajouté directement à l'ID d'objet, ray.Obtenez puis calculez

print(ray.get([func2.remote(x) for x in vec])) #production: [3, 5, 7, 9]

La valeur mesurée dans mon environnement est un peu plus lente que 2 secondes, mais elle peut être exécutée en parallèle plus rapidement que 8 secondes.

Actor La fonction remote retourne telle quelle après avoir été exécutée et ne peut pas avoir d'état. Dans Ray, le traitement qui a un état est réalisé en modifiant la classe avec @ ray.remote. Les classes qualifiées avec @ ray.remote sont appelées ** Actor **.

Par exemple, considérez le compteur suivant qui prend 1 seconde par incrément. Lors de la création d'une instance d'Actor, ajoutez .remote () comme dans le cas d'un appel de fonction.

@ray.remote
class Counter:
    def __init__(self, init_val, sleep=True):
        #Compteur d'initiation_Initialiser avec val
        self.count = init_val
        self.sleep = sleep

    def increment(self):
        if self.sleep:
            time.sleep(1)
        self.count += 1
        return self.count

#Créer des compteurs avec des valeurs initiales de 0 et 100
counter1, counter2 = Counter.remote(0), Counter.remote(100)

Enregistrons la valeur à chaque étape des résultats tout en incrémentant chaque compteur trois fois.

results = []
for _ in range(3):
    results.append(counter1.increment.remote())
    results.append(counter2.increment.remote())

print(ray.get(results)) #production: [1, 101, 2, 102, 3, 103]

Il a été incrémenté un total de 6 fois, mais comme il est parallélisé pour chaque compteur, il ne faut que 3 secondes pour obtenir la valeur.

De plus, si vous souhaitez appeler les méthodes de la même instance d'Actor en parallèle, vous pouvez définir une fonction distante qui prend une instance d'Actor comme argument. Par exemple, exécutons une fonction appelée ʻincrementer qui appelle ʻincrement toutes les secondes avec un décalage de 0,5 seconde comme suit. Notez qu'ici nous avons un Counter qui fait que l'incrément` lui-même se termine en un instant.

@ray.remote
def incrementer(counter, id, times):
    #Incrément de fois chaque seconde
    for _ in range(times):
        cnt = counter.increment.remote()
        print(f'id= {id} : count = {ray.get(cnt)}')
        time.sleep(1)

counter = Counter.remote(0, sleep=False) #Un compteur où un incrément se termine en un instant

incrementer.remote(counter, id=1, times=5)
time.sleep(0.5)  #Début 0.Décalage de 5 secondes
inc = incrementer.remote(counter, id=2, times=5)

ray.wait([inc]) #Sera expliqué ensuite,Fonction d'attendre la fin

Lorsque vous l'exécutez, vous pouvez voir que ʻincrementer met à jour la valeur de counter` alternativement toutes les 0,5 secondes comme suit.

(0.0 seconde plus tard) id = 1 : count = 1
(0.5 secondes plus tard) id = 2 : count = 2
(1.0 seconde plus tard) id = 1 : count = 3
(1.5 secondes plus tard) id = 2 : count = 4
(2.0 seconde plus tard) ......

ray.wait Si vous passez une liste d'ID d'objets exécutés en parallèle à ray.get, vous ne pourrez pas obtenir les valeurs tant qu'elles n'auront pas toutes fini de fonctionner. Si vous utilisez ray.wait, il attendra que le nombre spécifié de fonctions exécutées en parallèle soit terminé, et retournera l'ID qui s'est terminé à ce point et l'ID qui ne l'a pas fait.

@ray.remote
def sleep(x):
    #Une fonction qui se repose pendant x secondes et renvoie x
    time.sleep(x)
    return x

ids = [sleep.remote(3), sleep.remote(5), sleep.remote(2)]
finished_ids, running_ids = ray.wait(ids, num_returns=2, timeout=None)

print(ray.get(finished_ids)) #production(Après 3 secondes): [3,2] 
print(ray.get(running_ids))  #production(Après 5 secondes): [5]

ray.put En fait, chaque objet passé à la fonction remote est implicitement sérialisé et copié dans la mémoire partagée de Ray. Par conséquent, si un objet énorme est passé à l'argument de distant plusieurs fois, la copie prendra plus de temps et la zone de la mémoire partagée sera gaspillée.

Dans de tels cas, vous pouvez éviter ce gaspillage en copiant explicitement une seule fois à l'avance en utilisant ray.put. ray.put renvoie un ID d'objet comme remote et le transmet à la fonction remote. Une fois copié, l'objet est partagé, de sorte que tout travailleur s'exécutant en parallèle peut le voir.

@ray.remote
def func4(obj, idx):
    time.sleep(1)
    return idx

# big_Laisser objet être un gros objet
big_object = None

big_obj_id = ray.put(big_object)

# func.remote()Est appelé quatre fois,Je passe l'ID d'objet, donc c'est encore gros_La copie de l'objet ne se produit pas
results = [func4.remote(big_obj_id, i) for i in range(4)]

print(ray.get(results)) #production: [0, 1, 2, 3]

Notez que la désérialisation de Ray.get semble être beaucoup plus rapide que celle de pickle.load.

en conclusion

Le Document officiel a une utilisation plus détaillée. En particulier, Exemples contient des exemples spécifiques tels que le serveur de paramètres et l'apprentissage amélioré dans un environnement distribué, ce qui est utile. Allons. Il existe également un framework de haut niveau basé sur Ray, [RLlib] pour un apprentissage amélioré (https://docs.ray.io/en/latest/rllib.html) et [pour le réglage des hyperparamètres]. Tune](https://docs.ray.io/en/latest/tune.html) et ainsi de suite. Obtenons une vie de traitement parallèle confortable avec Ray.

Site de référence

Recommended Posts

Introduction au traitement parallèle distribué Python par Ray
Comment faire un traitement parallèle multicœur avec python
Une introduction à la programmation Python
[Chapitre 5] Introduction à Python avec 100 coups de traitement du langage
Note de lecture: Introduction à l'analyse de données avec Python
[Chapitre 3] Introduction à Python avec 100 coups de traitement du langage
[Chapitre 2] Introduction à Python avec 100 coups de traitement du langage
[Chapitre 4] Introduction à Python avec 100 coups de traitement du langage
[Python] Traitement parallèle facile avec Joblib
Premiers pas avec Python pour les non-ingénieurs
[Tutoriel Python] Une introduction facile à Python
Introduction au remplissage d'image Python Remplissage d'image à l'aide d'ImageDataGenerator
[Introduction à Python] Utilisons foreach avec Python
Une introduction à Python pour l'apprentissage automatique
Une introduction à Python pour les programmeurs en langage C
Création d'un fichier exe avec Python PyInstaller: le PC se fige dans le traitement parallèle
Qu'est-ce qu'un algorithme? Introduction à l'algorithme de recherche] ~ Python ~
[Python] Introduction facile à l'apprentissage automatique avec python (SVM)
Introduction à l'intelligence artificielle avec Python 1 «Théorie des algorithmes génétiques»
Markov Chain Artificial Brainless avec Python + Janome (1) Introduction à Janome
Chaîne de Markov artificielle sans cervelle avec Python + Janome (2) Introduction à la chaîne de Markov
Introduction à l'intelligence artificielle avec Python 2 «Pratique de l'algorithme génétique»
Envoyez un email à l'adresse de Spushi avec python
Introduction à Tornado (1): Framework Web Python démarré avec Tornado
Comment recadrer une image avec Python + OpenCV
Introduction au vol en formation avec Tello edu (Python)
Introduction à Python avec Atom (en route)
Introduction à Python "Re" 1 Construction d'un environnement d'exécution
Introduction au modèle linéaire généralisé (GLM) par Python
Traitement parallèle sans signification profonde en Python
[Introduction à l'application Udemy Python3 +] 9. Tout d'abord, imprimez avec print
Traitement distribué Python Spartan
Introduction au langage Python
Introduction à OpenCV (python) - (2)
Traitement d'image avec Python
Traitement parallèle avec multitraitement
[Introduction à Python] Comment utiliser l'instruction while (traitement répétitif)
[Introduction à Python] Comment itérer avec la fonction range?
Introduction aux mathématiques à partir du mémo d'étude Python Vol.1
[Chapitre 6] Introduction à scicit-learn avec 100 coups de traitement du langage
J'ai essayé d'implémenter le perceptron artificiel avec python
Créer un environnement pour le traitement du langage naturel avec Python
Traitement d'image avec Python (partie 2)
100 coups de traitement du langage avec Python 2015
Connectez-vous à BigQuery avec Python
opencv-python Introduction au traitement d'image
Traitement parallèle avec des fonctions locales
Introduction à Python Django (2) Win
"Traitement Apple" avec OpenCV3 + Python3
Introduction à Private TensorFlow
Une introduction à l'apprentissage automatique
Connectez-vous à Wikipedia avec Python
Traitement du signal acoustique avec Python (2)
Publiez sur Slack avec Python 3
Traitement du signal acoustique avec Python
Introduction à RDB avec sqlalchemy Ⅰ
Introduction à la communication série [Python]
Traitement parallèle avec Parallel de scikit-learn
Traitement d'image avec Python (partie 1)