Ray est un framework qui vous permet d'écrire rapidement et simplement un traitement parallèle distribué en Python, et est conçu pour faciliter la parallélisation du code existant. En utilisant Ray, vous pouvez écrire un traitement parallèle au niveau du processus plus facilement que le multitraitement.
Cet article est basé sur le contenu du Tutoriel Ray. Le code a été confirmé pour fonctionner avec Python 3.8.2, Ray 0.8.4.
Vous pouvez l'installer à partir de pip, etc. dans le terminal.
$ pip install ray
Il n'y a que trois utilisations de base à retenir: ray.init`` ray.remote`` ray.get
, et cet article présentera également ray.wait
ray.put
.
Pensez à paralléliser l'exécution de func
pour le code suivant, où la fonction func
, qui prend 3 secondes à s'exécuter, est appelée deux fois et l'exécution entière prend 6 secondes.
import time
def func(x):
time.sleep(3)
return x
begin_time = time.time() #Heure de début d'enregistrement
res1, res2 = func(1), func(2) #Appeler func deux fois
print(res1, res2) #production: 1 2
end_time = time.time() #Enregistrez l'heure de fin
print(end_time - begin_time) #Environ 6 secondes
Lors de l'utilisation de Ray, il est nécessaire de ** toujours d'abord ** spécifier le nombre de ressources à utiliser dans ray.init
et de démarrer le processus Ray.
import ray
# ray.init()Si vous ne spécifiez pas explicitement comme, le nombre de ressources sera déterminé automatiquement.
ray.init(num_cpus=4)
#Attendez un peu que Ray démarre pour une mesure du temps plus précise
time.sleep(1)
Si vous voulez exécuter une fonction en parallèle, vous devez en faire une ** fonction distante ** que Ray peut gérer.
Cela dit, c'est facile à faire, il suffit d'ajouter @ ray.remote
et un décorateur à la fonction.
La fonction distante peut être appelée comme (nom de la fonction) .remote (argument)
et envoyée aux travailleurs parallèles de Ray pour exécution.
.remote (argument)
renvoie ** Object ID ** sans attendre qu'il se termine.
@ray.remote
def func(x):
time.sleep(3)
return x
begin_time = time.time()
res1, res2 = func.remote(1), func.remote(2)
print(res1) #Exemple de sortie: ObjectID(45b9....)
Si vous voulez obtenir le résultat, vous pouvez passer l'ID d'objet renvoyé par la fonction distante à ray.get
.
«ray.get» bloque jusqu'à ce que tous les résultats correspondant à l'ID d'objet soient obtenus.
print(ray.get(res1), ray.get(res2)) #production: 1 2
# ray.get peut aussi recevoir une liste
print(ray.get([res1, res2])) #production: [1, 2]
end_time = time.time()
print(end_time - begin_time) #Environ 3 secondes
Lorsque le code ci-dessus est exécuté comme un seul script, cela ne prend qu'environ 3 secondes, et vous pouvez voir que l'exécution de func
est parallélisée.
Voilà toutes les bases.
Ray peut gérer même s'il existe une dépendance entre les fonctions distantes en passant l'ID d'objet tel quel. L'ID d'objet transmis est restauré dans un objet Python normal et exécuté lorsqu'il est réellement exécuté. Dans l'exemple ci-dessous, «func1» et «func2» sont appliqués en séquence à chacun des quatre éléments de «vec». Le traitement d'un élément prend 2 secondes.
@ray.remote
def func1(x):
time.sleep(1)
return x * 2
@ray.remote
def func2(x):
time.sleep(1)
return x + 1
vec = [1, 2, 3, 4]
results = []
for x in vec:
res = func1.remote(x) #objectID est inclus dans res
res = func2.remote(res) #Passez l'ObjectID tel quel à la fonction distante suivante
results.append(res)
#results est une liste d'ObjectID
print(ray.get(results)) #production: [3, 5, 7, 9]
Ray analyse les dépendances, lance d'abord func1
sans dépendances, puis exécute func2
en parallèle pour les éléments traités de func1
.
Ce processus, qui prend 8 secondes séquentiellement, est exécuté en environ 2 secondes par parallélisation.
Ray prend également en charge les appels imbriqués et la réécriture de func2
comme suit fonctionne très bien.
La seule condition pour un appel imbriqué est que la fonction que vous souhaitez appeler soit prédéfinie.
@ray.remote
def func2(x):
x = func1.remote(x) #ObjectID renvoyé
time.sleep(1)
return ray.get(x) + 1 #Parce qu'il ne peut pas être ajouté directement à l'ID d'objet, ray.Obtenez puis calculez
print(ray.get([func2.remote(x) for x in vec])) #production: [3, 5, 7, 9]
La valeur mesurée dans mon environnement est un peu plus lente que 2 secondes, mais elle peut être exécutée en parallèle plus rapidement que 8 secondes.
Actor
La fonction remote retourne telle quelle après avoir été exécutée et ne peut pas avoir d'état.
Dans Ray, le traitement qui a un état est réalisé en modifiant la classe avec @ ray.remote
.
Les classes qualifiées avec @ ray.remote
sont appelées ** Actor **.
Par exemple, considérez le compteur suivant qui prend 1 seconde par incrément.
Lors de la création d'une instance d'Actor, ajoutez .remote ()
comme dans le cas d'un appel de fonction.
@ray.remote
class Counter:
def __init__(self, init_val, sleep=True):
#Compteur d'initiation_Initialiser avec val
self.count = init_val
self.sleep = sleep
def increment(self):
if self.sleep:
time.sleep(1)
self.count += 1
return self.count
#Créer des compteurs avec des valeurs initiales de 0 et 100
counter1, counter2 = Counter.remote(0), Counter.remote(100)
Enregistrons la valeur à chaque étape des résultats tout en incrémentant chaque compteur trois fois.
results = []
for _ in range(3):
results.append(counter1.increment.remote())
results.append(counter2.increment.remote())
print(ray.get(results)) #production: [1, 101, 2, 102, 3, 103]
Il a été incrémenté un total de 6 fois, mais comme il est parallélisé pour chaque compteur, il ne faut que 3 secondes pour obtenir la valeur.
De plus, si vous souhaitez appeler les méthodes de la même instance d'Actor en parallèle, vous pouvez définir une fonction distante qui prend une instance d'Actor comme argument.
Par exemple, exécutons une fonction appelée ʻincrementer qui appelle ʻincrement
toutes les secondes avec un décalage de 0,5 seconde comme suit.
Notez qu'ici nous avons un Counter
qui fait que l'incrément` lui-même se termine en un instant.
@ray.remote
def incrementer(counter, id, times):
#Incrément de fois chaque seconde
for _ in range(times):
cnt = counter.increment.remote()
print(f'id= {id} : count = {ray.get(cnt)}')
time.sleep(1)
counter = Counter.remote(0, sleep=False) #Un compteur où un incrément se termine en un instant
incrementer.remote(counter, id=1, times=5)
time.sleep(0.5) #Début 0.Décalage de 5 secondes
inc = incrementer.remote(counter, id=2, times=5)
ray.wait([inc]) #Sera expliqué ensuite,Fonction d'attendre la fin
Lorsque vous l'exécutez, vous pouvez voir que ʻincrementer met à jour la valeur de
counter` alternativement toutes les 0,5 secondes comme suit.
(0.0 seconde plus tard) id = 1 : count = 1
(0.5 secondes plus tard) id = 2 : count = 2
(1.0 seconde plus tard) id = 1 : count = 3
(1.5 secondes plus tard) id = 2 : count = 4
(2.0 seconde plus tard) ......
ray.wait
Si vous passez une liste d'ID d'objets exécutés en parallèle à ray.get
, vous ne pourrez pas obtenir les valeurs tant qu'elles n'auront pas toutes fini de fonctionner.
Si vous utilisez ray.wait
, il attendra que le nombre spécifié de fonctions exécutées en parallèle soit terminé, et retournera l'ID qui s'est terminé à ce point et l'ID qui ne l'a pas fait.
@ray.remote
def sleep(x):
#Une fonction qui se repose pendant x secondes et renvoie x
time.sleep(x)
return x
ids = [sleep.remote(3), sleep.remote(5), sleep.remote(2)]
finished_ids, running_ids = ray.wait(ids, num_returns=2, timeout=None)
print(ray.get(finished_ids)) #production(Après 3 secondes): [3,2]
print(ray.get(running_ids)) #production(Après 5 secondes): [5]
ray.put
En fait, chaque objet passé à la fonction remote
est implicitement sérialisé et copié dans la mémoire partagée de Ray.
Par conséquent, si un objet énorme est passé à l'argument de distant
plusieurs fois, la copie prendra plus de temps et la zone de la mémoire partagée sera gaspillée.
Dans de tels cas, vous pouvez éviter ce gaspillage en copiant explicitement une seule fois à l'avance en utilisant ray.put
.
ray.put
renvoie un ID d'objet comme remote
et le transmet à la fonction remote.
Une fois copié, l'objet est partagé, de sorte que tout travailleur s'exécutant en parallèle peut le voir.
@ray.remote
def func4(obj, idx):
time.sleep(1)
return idx
# big_Laisser objet être un gros objet
big_object = None
big_obj_id = ray.put(big_object)
# func.remote()Est appelé quatre fois,Je passe l'ID d'objet, donc c'est encore gros_La copie de l'objet ne se produit pas
results = [func4.remote(big_obj_id, i) for i in range(4)]
print(ray.get(results)) #production: [0, 1, 2, 3]
Notez que la désérialisation de Ray.get semble être beaucoup plus rapide que celle de pickle.load.
Le Document officiel a une utilisation plus détaillée. En particulier, Exemples contient des exemples spécifiques tels que le serveur de paramètres et l'apprentissage amélioré dans un environnement distribué, ce qui est utile. Allons. Il existe également un framework de haut niveau basé sur Ray, [RLlib] pour un apprentissage amélioré (https://docs.ray.io/en/latest/rllib.html) et [pour le réglage des hyperparamètres]. Tune](https://docs.ray.io/en/latest/tune.html) et ainsi de suite. Obtenons une vie de traitement parallèle confortable avec Ray.
Recommended Posts