Ray ist ein Framework, mit dem Sie schnell und einfach verteilte Parallelverarbeitung in Python schreiben können. Es soll das Parallelisieren von vorhandenem Code vereinfachen. Mit Ray können Sie die Parallelverarbeitung auf Prozessebene einfacher schreiben als die Mehrfachverarbeitung.
Dieser Artikel basiert auf dem Inhalt von Ray Tutorial. Es wurde bestätigt, dass der Code mit Python 3.8.2, Ray 0.8.4 funktioniert.
Sie können es von Pip usw. im Terminal installieren.
$ pip install ray
Es gibt nur drei grundlegende Verwendungszwecke: ray.init`` ray.remote`` ray.get
, und in diesem Artikel wird auch ray.wait
ray.put
vorgestellt.
Ziehen Sie in Betracht, die Ausführung von func
für den folgenden Code zu parallelisieren, wobei die Funktion func
, deren Ausführung 3 Sekunden dauert, zweimal aufgerufen wird und die gesamte Ausführung 6 Sekunden dauert.
import time
def func(x):
time.sleep(3)
return x
begin_time = time.time() #Startzeit aufzeichnen
res1, res2 = func(1), func(2) #Rufen Sie func zweimal an
print(res1, res2) #Ausgabe: 1 2
end_time = time.time() #Notieren Sie die Endzeit
print(end_time - begin_time) #Etwa 6 Sekunden
Wenn Sie Ray verwenden, müssen Sie ** immer zuerst ** die Anzahl der Ressourcen angeben, die in ray.init
verwendet werden sollen, und den Ray-Prozess starten.
import ray
# ray.init()Wenn Sie nicht explizit like angeben, wird die Anzahl der Ressourcen automatisch ermittelt.
ray.init(num_cpus=4)
#Warten Sie ein wenig, bis Ray startet, um eine genauere Zeitmessung zu ermöglichen
time.sleep(1)
Wenn Sie eine Funktion parallel ausführen möchten, müssen Sie sie zu einer ** Remote-Funktion ** machen, die Ray verarbeiten kann.
Das heißt, es ist einfach, fügen Sie einfach @ ray.remote
und einen Dekorateur zur Funktion hinzu.
Wenn Sie die Remote-Funktion als "(Funktionsname) .remote (Argument)" aufrufen, wird sie zur Ausführung an Rays parallele Mitarbeiter gesendet.
.remote (argument)
gibt ** Object ID ** zurück, ohne darauf zu warten, dass es beendet wird.
@ray.remote
def func(x):
time.sleep(3)
return x
begin_time = time.time()
res1, res2 = func.remote(1), func.remote(2)
print(res1) #Ausgabebeispiel: ObjectID(45b9....)
Wenn Sie das Ergebnis erhalten möchten, können Sie die von der Remote-Funktion zurückgegebene Objekt-ID an ray.get
übergeben.
ray.get
blockiert, bis alle der Objekt-ID entsprechenden Ergebnisse erhalten wurden.
print(ray.get(res1), ray.get(res2)) #Ausgabe: 1 2
# ray.get kann auch eine Liste erhalten
print(ray.get([res1, res2])) #Ausgabe: [1, 2]
end_time = time.time()
print(end_time - begin_time) #Ca. 3 Sekunden
Wenn der obige Code als ein Skript ausgeführt wird, dauert es nur etwa 3 Sekunden, und Sie können sehen, dass die Ausführung von func
parallelisiert ist.
Das sind alle Grundlagen.
Ray kann auch dann verarbeiten, wenn eine Abhängigkeit zwischen den Remote-Funktionen besteht, indem die Objekt-ID unverändert übergeben wird. Die übergebene Objekt-ID wird in einem normalen Python-Objekt wiederhergestellt und ausgeführt, wenn es tatsächlich ausgeführt wird. Im folgenden Beispiel werden "func1" und "func2" nacheinander auf jedes der vier Elemente in "vec" angewendet. Die Verarbeitung eines Elements dauert 2 Sekunden.
@ray.remote
def func1(x):
time.sleep(1)
return x * 2
@ray.remote
def func2(x):
time.sleep(1)
return x + 1
vec = [1, 2, 3, 4]
results = []
for x in vec:
res = func1.remote(x) #objectID ist in res enthalten
res = func2.remote(res) #Übergeben Sie die ObjectID unverändert an die nächste Remote-Funktion
results.append(res)
#Ergebnisse ist eine Liste von ObjectIDs
print(ray.get(results)) #Ausgabe: [3, 5, 7, 9]
Ray analysiert die Abhängigkeiten, führt zuerst "func1" ohne Abhängigkeiten aus und führt dann "func2" parallel für die verarbeiteten Elemente von "func1" aus. Dieser Vorgang, der nacheinander 8 Sekunden dauert, wird durch Parallelisierung in etwa 2 Sekunden ausgeführt.
Ray unterstützt auch verschachtelte Aufrufe, und das Umschreiben von "func2" wie folgt funktioniert einwandfrei. Die einzige Bedingung für einen verschachtelten Aufruf ist, dass die Funktion, die Sie aufrufen möchten, vordefiniert ist.
@ray.remote
def func2(x):
x = func1.remote(x) #ObjectID wurde zurückgegeben
time.sleep(1)
return ray.get(x) + 1 #Weil es nicht direkt zur Objekt-ID hinzugefügt werden kann, ray.Holen Sie sich und berechnen Sie dann
print(ray.get([func2.remote(x) for x in vec])) #Ausgabe: [3, 5, 7, 9]
Der gemessene Wert in meiner Umgebung ist etwas langsamer als 2 Sekunden, kann aber parallel schneller als 8 Sekunden ausgeführt werden.
Actor
Die Remote-Funktion kehrt nach der Ausführung unverändert zurück und kann keinen Status haben.
In Ray wird eine Verarbeitung mit einem Status realisiert, indem die Klasse mit "@ ray.remote" geändert wird.
Mit @ ray.remote
qualifizierte Klassen heißen ** Actor **.
Betrachten Sie beispielsweise den folgenden Zähler, der 1 Sekunde pro Inkrement benötigt. Fügen Sie beim Erstellen einer Instanz von Actor ".remote ()" hinzu, wie im Fall eines Funktionsaufrufs.
@ray.remote
class Counter:
def __init__(self, init_val, sleep=True):
#Init Zähler_Mit val initialisieren
self.count = init_val
self.sleep = sleep
def increment(self):
if self.sleep:
time.sleep(1)
self.count += 1
return self.count
#Erstellen Sie Zähler mit Anfangswerten von 0 und 100
counter1, counter2 = Counter.remote(0), Counter.remote(100)
Lassen Sie uns den Wert in jeder Phase der Ergebnisse aufzeichnen, während wir jeden Zähler dreimal erhöhen.
results = []
for _ in range(3):
results.append(counter1.increment.remote())
results.append(counter2.increment.remote())
print(ray.get(results)) #Ausgabe: [1, 101, 2, 102, 3, 103]
Es wurde insgesamt 6 Mal erhöht, aber da es für jeden Zähler parallelisiert wird, dauert es nur 3 Sekunden, um den Wert zu erhalten.
Wenn Sie die Methoden derselben Instanz von Actor parallel aufrufen möchten, können Sie auch eine Remote-Funktion definieren, die eine Instanz von Actor als Argument verwendet. Lassen Sie uns zum Beispiel eine Funktion namens "Inkrementierer" ausführen, die "Inkrement" jede Sekunde mit einer Verschiebung von 0,5 Sekunden wie folgt aufruft. Beachten Sie, dass wir hier einen "Zähler" haben, der das "Inkrement" selbst sofort beenden lässt.
@ray.remote
def incrementer(counter, id, times):
#Inkrementiere mal jede Sekunde
for _ in range(times):
cnt = counter.increment.remote()
print(f'id= {id} : count = {ray.get(cnt)}')
time.sleep(1)
counter = Counter.remote(0, sleep=False) #Ein Zähler, bei dem ein Inkrement augenblicklich endet
incrementer.remote(counter, id=1, times=5)
time.sleep(0.5) #Starten Sie 0.Um 5 Sekunden verschieben
inc = incrementer.remote(counter, id=2, times=5)
ray.wait([inc]) #Wird als nächstes erklärt,Funktion, um auf das Ende zu warten
Wenn Sie es ausführen, können Sie sehen, dass "Inkrementierer" abwechselnd alle 0,5 Sekunden den Wert von "Zähler" wie folgt aktualisiert:
(0.0 Sekunden später) id = 1 : count = 1
(0.5 Sekunden später) id = 2 : count = 2
(1.0 Sekunden später) id = 1 : count = 3
(1.5 Sekunden später) id = 2 : count = 4
(2.0 Sekunden später) ......
ray.wait Wenn Sie eine Liste von Objekt-IDs übergeben, die parallel zu "ray.get" ausgeführt werden, können Sie die Werte erst abrufen, wenn alle ausgeführt wurden. Wenn Sie "ray.wait" verwenden, wartet es, bis die angegebene Anzahl parallel ausgeführter Funktionen abgeschlossen ist, und gibt die ID zurück, die an diesem Punkt endete, und die ID, die dies nicht tat.
@ray.remote
def sleep(x):
#Eine Funktion, die x Sekunden ruht und x zurückgibt
time.sleep(x)
return x
ids = [sleep.remote(3), sleep.remote(5), sleep.remote(2)]
finished_ids, running_ids = ray.wait(ids, num_returns=2, timeout=None)
print(ray.get(finished_ids)) #Ausgabe(Nach 3 Sekunden): [3,2]
print(ray.get(running_ids)) #Ausgabe(Nach 5 Sekunden): [5]
ray.put Tatsächlich wird jedes an die "Remote" -Funktion übergebene Objekt implizit serialisiert und in Rays gemeinsamen Speicher kopiert. Wenn ein großes Objekt mehrmals an das Argument "remote" übergeben wird, dauert das Kopieren daher länger und der Bereich im gemeinsam genutzten Speicher wird verschwendet.
In solchen Fällen können Sie diese Verschwendung vermeiden, indem Sie nur einmal im Voraus explizit mit "ray.put" kopieren.
ray.put
gibt eine Objekt-ID wie remote
zurück und übergibt sie an die Remote-Funktion.
Nach dem Kopieren wird das Objekt freigegeben, sodass jeder parallel ausgeführte Mitarbeiter es sehen kann.
@ray.remote
def func4(obj, idx):
time.sleep(1)
return idx
# big_Das Objekt sei ein großes Objekt
big_object = None
big_obj_id = ray.put(big_object)
# func.remote()Wird viermal aufgerufen,Ich übergebe die Objekt-ID, also ist sie wieder groß_Das Kopieren eines Objekts erfolgt nicht
results = [func4.remote(big_obj_id, i) for i in range(4)]
print(ray.get(results)) #Ausgabe: [0, 1, 2, 3]
Beachten Sie, dass Rays ray.get
-Deserialisierung viel schneller zu sein scheint als pickle.load
.
Das offizielle Dokument wird ausführlicher verwendet. Insbesondere enthält Beispiele spezifische Beispiele wie Parameterserver und erweitertes Lernen in einer verteilten Umgebung, was hilfreich ist. Lasst uns. Es gibt auch ein auf Ray basierendes Framework auf hoher Ebene, [RLlib] für erweitertes Lernen (https://docs.ray.io/en/latest/rllib.html) und [für die Optimierung von Hyperparametern]. Tune](https://docs.ray.io/en/latest/tune.html) und so weiter. Lassen Sie uns mit Ray ein komfortables Leben in Parallelverarbeitung führen.
Recommended Posts