Einführung in die verteilte Parallelverarbeitung von Python durch Ray

Was ist Ray?

Ray ist ein Framework, mit dem Sie schnell und einfach verteilte Parallelverarbeitung in Python schreiben können. Es soll das Parallelisieren von vorhandenem Code vereinfachen. Mit Ray können Sie die Parallelverarbeitung auf Prozessebene einfacher schreiben als die Mehrfachverarbeitung.

Dieser Artikel basiert auf dem Inhalt von Ray Tutorial. Es wurde bestätigt, dass der Code mit Python 3.8.2, Ray 0.8.4 funktioniert.

Installation

Sie können es von Pip usw. im Terminal installieren.

$ pip install ray

Wie benutzt man

Es gibt nur drei grundlegende Verwendungszwecke: ray.init`` ray.remote`` ray.get, und in diesem Artikel wird auch ray.wait ray.put vorgestellt.

Grundlagen der Parallelisierung mit Ray

Ziehen Sie in Betracht, die Ausführung von func für den folgenden Code zu parallelisieren, wobei die Funktion func, deren Ausführung 3 Sekunden dauert, zweimal aufgerufen wird und die gesamte Ausführung 6 Sekunden dauert.

import time

def func(x):
    time.sleep(3)
    return x

begin_time = time.time() #Startzeit aufzeichnen

res1, res2 = func(1), func(2) #Rufen Sie func zweimal an
print(res1, res2)  #Ausgabe: 1 2

end_time = time.time() #Notieren Sie die Endzeit

print(end_time - begin_time) #Etwa 6 Sekunden

Wenn Sie Ray verwenden, müssen Sie ** immer zuerst ** die Anzahl der Ressourcen angeben, die in ray.init verwendet werden sollen, und den Ray-Prozess starten.

import ray

# ray.init()Wenn Sie nicht explizit like angeben, wird die Anzahl der Ressourcen automatisch ermittelt.
ray.init(num_cpus=4)

#Warten Sie ein wenig, bis Ray startet, um eine genauere Zeitmessung zu ermöglichen
time.sleep(1)

Wenn Sie eine Funktion parallel ausführen möchten, müssen Sie sie zu einer ** Remote-Funktion ** machen, die Ray verarbeiten kann. Das heißt, es ist einfach, fügen Sie einfach @ ray.remote und einen Dekorateur zur Funktion hinzu. Wenn Sie die Remote-Funktion als "(Funktionsname) .remote (Argument)" aufrufen, wird sie zur Ausführung an Rays parallele Mitarbeiter gesendet. .remote (argument) gibt ** Object ID ** zurück, ohne darauf zu warten, dass es beendet wird.

@ray.remote
def func(x):
    time.sleep(3)
    return x

begin_time = time.time()
res1, res2 = func.remote(1), func.remote(2)
print(res1) #Ausgabebeispiel: ObjectID(45b9....) 

Wenn Sie das Ergebnis erhalten möchten, können Sie die von der Remote-Funktion zurückgegebene Objekt-ID an ray.get übergeben. ray.get blockiert, bis alle der Objekt-ID entsprechenden Ergebnisse erhalten wurden.

print(ray.get(res1), ray.get(res2)) #Ausgabe: 1 2

# ray.get kann auch eine Liste erhalten
print(ray.get([res1, res2])) #Ausgabe: [1, 2]

end_time = time.time()
print(end_time - begin_time) #Ca. 3 Sekunden

Wenn der obige Code als ein Skript ausgeführt wird, dauert es nur etwa 3 Sekunden, und Sie können sehen, dass die Ausführung von func parallelisiert ist. Das sind alle Grundlagen.

Abhängige Parallelisierung

Ray kann auch dann verarbeiten, wenn eine Abhängigkeit zwischen den Remote-Funktionen besteht, indem die Objekt-ID unverändert übergeben wird. Die übergebene Objekt-ID wird in einem normalen Python-Objekt wiederhergestellt und ausgeführt, wenn es tatsächlich ausgeführt wird. Im folgenden Beispiel werden "func1" und "func2" nacheinander auf jedes der vier Elemente in "vec" angewendet. Die Verarbeitung eines Elements dauert 2 Sekunden.

@ray.remote
def func1(x):
    time.sleep(1)
    return x * 2

@ray.remote
def func2(x):
    time.sleep(1)
    return x + 1

vec = [1, 2, 3, 4]
results = []

for x in vec:
    res = func1.remote(x)  #objectID ist in res enthalten
    res = func2.remote(res) #Übergeben Sie die ObjectID unverändert an die nächste Remote-Funktion
    results.append(res)

#Ergebnisse ist eine Liste von ObjectIDs
print(ray.get(results)) #Ausgabe: [3, 5, 7, 9]

Ray analysiert die Abhängigkeiten, führt zuerst "func1" ohne Abhängigkeiten aus und führt dann "func2" parallel für die verarbeiteten Elemente von "func1" aus. Dieser Vorgang, der nacheinander 8 Sekunden dauert, wird durch Parallelisierung in etwa 2 Sekunden ausgeführt.

Ray unterstützt auch verschachtelte Aufrufe, und das Umschreiben von "func2" wie folgt funktioniert einwandfrei. Die einzige Bedingung für einen verschachtelten Aufruf ist, dass die Funktion, die Sie aufrufen möchten, vordefiniert ist.

@ray.remote
def func2(x):
    x = func1.remote(x)  #ObjectID wurde zurückgegeben
    time.sleep(1)
    return ray.get(x) + 1 #Weil es nicht direkt zur Objekt-ID hinzugefügt werden kann, ray.Holen Sie sich und berechnen Sie dann

print(ray.get([func2.remote(x) for x in vec])) #Ausgabe: [3, 5, 7, 9]

Der gemessene Wert in meiner Umgebung ist etwas langsamer als 2 Sekunden, kann aber parallel schneller als 8 Sekunden ausgeführt werden.

Actor Die Remote-Funktion kehrt nach der Ausführung unverändert zurück und kann keinen Status haben. In Ray wird eine Verarbeitung mit einem Status realisiert, indem die Klasse mit "@ ray.remote" geändert wird. Mit @ ray.remote qualifizierte Klassen heißen ** Actor **.

Betrachten Sie beispielsweise den folgenden Zähler, der 1 Sekunde pro Inkrement benötigt. Fügen Sie beim Erstellen einer Instanz von Actor ".remote ()" hinzu, wie im Fall eines Funktionsaufrufs.

@ray.remote
class Counter:
    def __init__(self, init_val, sleep=True):
        #Init Zähler_Mit val initialisieren
        self.count = init_val
        self.sleep = sleep

    def increment(self):
        if self.sleep:
            time.sleep(1)
        self.count += 1
        return self.count

#Erstellen Sie Zähler mit Anfangswerten von 0 und 100
counter1, counter2 = Counter.remote(0), Counter.remote(100)

Lassen Sie uns den Wert in jeder Phase der Ergebnisse aufzeichnen, während wir jeden Zähler dreimal erhöhen.

results = []
for _ in range(3):
    results.append(counter1.increment.remote())
    results.append(counter2.increment.remote())

print(ray.get(results)) #Ausgabe: [1, 101, 2, 102, 3, 103]

Es wurde insgesamt 6 Mal erhöht, aber da es für jeden Zähler parallelisiert wird, dauert es nur 3 Sekunden, um den Wert zu erhalten.

Wenn Sie die Methoden derselben Instanz von Actor parallel aufrufen möchten, können Sie auch eine Remote-Funktion definieren, die eine Instanz von Actor als Argument verwendet. Lassen Sie uns zum Beispiel eine Funktion namens "Inkrementierer" ausführen, die "Inkrement" jede Sekunde mit einer Verschiebung von 0,5 Sekunden wie folgt aufruft. Beachten Sie, dass wir hier einen "Zähler" haben, der das "Inkrement" selbst sofort beenden lässt.

@ray.remote
def incrementer(counter, id, times):
    #Inkrementiere mal jede Sekunde
    for _ in range(times):
        cnt = counter.increment.remote()
        print(f'id= {id} : count = {ray.get(cnt)}')
        time.sleep(1)

counter = Counter.remote(0, sleep=False) #Ein Zähler, bei dem ein Inkrement augenblicklich endet

incrementer.remote(counter, id=1, times=5)
time.sleep(0.5)  #Starten Sie 0.Um 5 Sekunden verschieben
inc = incrementer.remote(counter, id=2, times=5)

ray.wait([inc]) #Wird als nächstes erklärt,Funktion, um auf das Ende zu warten

Wenn Sie es ausführen, können Sie sehen, dass "Inkrementierer" abwechselnd alle 0,5 Sekunden den Wert von "Zähler" wie folgt aktualisiert:

(0.0 Sekunden später) id = 1 : count = 1
(0.5 Sekunden später) id = 2 : count = 2
(1.0 Sekunden später) id = 1 : count = 3
(1.5 Sekunden später) id = 2 : count = 4
(2.0 Sekunden später) ......

ray.wait Wenn Sie eine Liste von Objekt-IDs übergeben, die parallel zu "ray.get" ausgeführt werden, können Sie die Werte erst abrufen, wenn alle ausgeführt wurden. Wenn Sie "ray.wait" verwenden, wartet es, bis die angegebene Anzahl parallel ausgeführter Funktionen abgeschlossen ist, und gibt die ID zurück, die an diesem Punkt endete, und die ID, die dies nicht tat.

@ray.remote
def sleep(x):
    #Eine Funktion, die x Sekunden ruht und x zurückgibt
    time.sleep(x)
    return x

ids = [sleep.remote(3), sleep.remote(5), sleep.remote(2)]
finished_ids, running_ids = ray.wait(ids, num_returns=2, timeout=None)

print(ray.get(finished_ids)) #Ausgabe(Nach 3 Sekunden): [3,2] 
print(ray.get(running_ids))  #Ausgabe(Nach 5 Sekunden): [5]

ray.put Tatsächlich wird jedes an die "Remote" -Funktion übergebene Objekt implizit serialisiert und in Rays gemeinsamen Speicher kopiert. Wenn ein großes Objekt mehrmals an das Argument "remote" übergeben wird, dauert das Kopieren daher länger und der Bereich im gemeinsam genutzten Speicher wird verschwendet.

In solchen Fällen können Sie diese Verschwendung vermeiden, indem Sie nur einmal im Voraus explizit mit "ray.put" kopieren. ray.put gibt eine Objekt-ID wie remote zurück und übergibt sie an die Remote-Funktion. Nach dem Kopieren wird das Objekt freigegeben, sodass jeder parallel ausgeführte Mitarbeiter es sehen kann.

@ray.remote
def func4(obj, idx):
    time.sleep(1)
    return idx

# big_Das Objekt sei ein großes Objekt
big_object = None

big_obj_id = ray.put(big_object)

# func.remote()Wird viermal aufgerufen,Ich übergebe die Objekt-ID, also ist sie wieder groß_Das Kopieren eines Objekts erfolgt nicht
results = [func4.remote(big_obj_id, i) for i in range(4)]

print(ray.get(results)) #Ausgabe: [0, 1, 2, 3]

Beachten Sie, dass Rays ray.get-Deserialisierung viel schneller zu sein scheint als pickle.load.

abschließend

Das offizielle Dokument wird ausführlicher verwendet. Insbesondere enthält Beispiele spezifische Beispiele wie Parameterserver und erweitertes Lernen in einer verteilten Umgebung, was hilfreich ist. Lasst uns. Es gibt auch ein auf Ray basierendes Framework auf hoher Ebene, [RLlib] für erweitertes Lernen (https://docs.ray.io/en/latest/rllib.html) und [für die Optimierung von Hyperparametern]. Tune](https://docs.ray.io/en/latest/tune.html) und so weiter. Lassen Sie uns mit Ray ein komfortables Leben in Parallelverarbeitung führen.

Referenzseite

Recommended Posts

Einführung in die verteilte Parallelverarbeitung von Python durch Ray
So führen Sie eine Mehrkern-Parallelverarbeitung mit Python durch
Eine Einführung in die Python-Programmierung
[Kapitel 5] Einführung in Python mit 100 Klopfen Sprachverarbeitung
Lesehinweis: Einführung in die Datenanalyse mit Python
[Kapitel 3] Einführung in Python mit 100 Klopfen Sprachverarbeitung
[Kapitel 2] Einführung in Python mit 100 Klopfen Sprachverarbeitung
[Kapitel 4] Einführung in Python mit 100 Klopfen Sprachverarbeitung
[Python] Einfache Parallelverarbeitung mit Joblib
Erste Schritte mit Python für Nicht-Ingenieure
[Python Tutorial] Eine einfache Einführung in Python
Einführung in das Auffüllen von Python-Bildern Auffüllen von Bildern mit ImageDataGenerator
[Einführung in Python] Verwenden wir foreach mit Python
Eine Einführung in Python für maschinelles Lernen
Eine Einführung in Python für C-Sprachprogrammierer
Erstellen einer exe-Datei mit Python PyInstaller: PC friert bei paralleler Verarbeitung ein
Was ist ein Algorithmus? Einführung in den Suchalgorithmus] ~ Python ~
[Python] Einfache Einführung in das maschinelle Lernen mit Python (SVM)
Einführung in die künstliche Intelligenz mit Python 1 "Genetic Algorithm-Theory-"
Markov Chain Artificial Brainless mit Python + Janome (1) Einführung in Janome
Markov-Kette Künstlich Gehirnlos mit Python + Janome (2) Einführung in die Markov-Kette
Einführung in die künstliche Intelligenz mit Python 2 "Genetic Algorithm-Practice-"
Senden Sie eine E-Mail mit Python an Spushis Adresse
Einführung in Tornado (1): Python Web Framework mit Tornado gestartet
So beschneiden Sie ein Bild mit Python + OpenCV
Einführung in den Formationsflug mit Tello edu (Python)
Einführung in Python mit Atom (unterwegs)
Einführung in Python "Re" 1 Erstellen einer Ausführungsumgebung
Einführung in das Generalized Linear Model (GLM) von Python
Parallele Verarbeitung ohne tiefe Bedeutung in Python
[Einführung in die Udemy Python3 + -Anwendung] 9. Drucken Sie zunächst mit print
Python verteilte Verarbeitung Spartan
Einführung in die Python-Sprache
Einführung in OpenCV (Python) - (2)
Bildverarbeitung mit Python
Parallelverarbeitung mit Mehrfachverarbeitung
[Einführung in Python] Verwendung der while-Anweisung (wiederholte Verarbeitung)
[Einführung in Python] Wie iteriere ich mit der Bereichsfunktion?
Einführung in die Mathematik ab Python Study Memo Vol.1
[Kapitel 6] Einführung in Scicit-Learn mit 100 Klopfen Sprachverarbeitung
Ich habe versucht, künstliches Perzeptron mit Python zu implementieren
Erstellen einer Umgebung für die Verarbeitung natürlicher Sprache mit Python
Bildverarbeitung mit Python (Teil 2)
100 Sprachverarbeitungsklopfen mit Python 2015
Stellen Sie mit Python eine Verbindung zu BigQuery her
opencv-python Einführung in die Bildverarbeitung
Parallelverarbeitung mit lokalen Funktionen
Einführung in Python Django (2) Win
"Apple-Verarbeitung" mit OpenCV3 + Python3
Einführung in Private TensorFlow
Eine Einführung in das maschinelle Lernen
Stellen Sie mit Python eine Verbindung zu Wikipedia her
Akustische Signalverarbeitung mit Python (2)
Post to Slack mit Python 3
Einführung in RDB mit sqlalchemy Ⅰ
Einführung in die serielle Kommunikation [Python]
Parallele Verarbeitung mit Parallel von Scikit-Learn
Bildverarbeitung mit Python (Teil 1)