[PYTHON] Verteilte Architektur vom Mars implementiert

Dieser Artikel stellt die verteilte Architektur vor, die in Alibabas Open Source ** Mars ** implementiert ist.

Wie der Mars aussieht vorheriger Artikel, Aber nach dem Testen auf einem internen System [Open Source auf GitHub](https://www.alibabacloud.com/blog/mars-matrix-based-universal-distributed-computing-framework_594606?spm = a2c65.11461447.0.0.4be1c339z4ytI2). Dieser Artikel stellt die in Mars implementierte verteilte Ausführungsarchitektur vor.

Einführung der Architektur

Mars bietet eine Bibliothek für die verteilte Ausführung von Tensoren. Diese Bibliothek wurde unter Verwendung des in mars.actors implementierten Akteurmodells geschrieben und enthält Scheduler, Worker und Webdienste.

Das vom Client an Mars Web Service übermittelte Diagramm besteht aus Tensoren. Der Webdienst empfängt das Diagramm und sendet es an den Scheduler. Vor dem Senden eines Jobs an jeden Mitarbeiter kompiliert der Mars-Scheduler das Tensordiagramm zu einem Diagramm, das aus Blöcken und Operanden besteht, analysiert und unterteilt das Diagramm. Der Scheduler erstellt dann eine Reihe von OperandActors, die die Ausführung eines einzelnen Operanden auf allen Schedulern basierend auf einem konsistenten Hash steuern. Die Operanden sind in topologischer Reihenfolge geplant. Wenn alle Operanden ausgeführt werden, wird das gesamte Diagramm als vollständig markiert und der Client kann die Ergebnisse aus dem Web abrufen. Der gesamte Ausführungsprozess ist in der folgenden Abbildung dargestellt.

image.png

Job senden

Der Client sendet einen Job über die RESTful-API an den Mars-Service. Der Client schreibt Code in den Tensor, wandelt die Tensoroperation über "session.run (tensor)" in einen aus Tensoren zusammengesetzten Graphen um und sendet ihn an die Web-API. Die Web-API sendet den Job dann an den SessionActor und erstellt einen GraphActor für die Diagrammanalyse und -verwaltung im Cluster. Der Client beginnt mit der Abfrage des Ausführungsstatus des Diagramms, bis die Ausführung abgeschlossen ist.

GraphActor wandelt zuerst ein Tensordiagramm in ein Diagramm um, das gemäß den Blockeinstellungen aus Operanden und Blöcken besteht. Durch diesen Vorgang kann das Diagramm unterteilt und parallel ausgeführt werden. Danach wird eine Reihe von Analysen für das Diagramm durchgeführt, die Priorität des Operanden wird erhalten und dem Startoperanden wird ein Arbeiter zugewiesen. Informationen zu diesem Teil finden Sie unter "Vorbereiten des Ausführungsdiagramms". Erstellen Sie dann für jeden Operanden einen OperandActor, um die spezifische Ausführung des Operanden zu steuern. Wenn sich ein Operand im Status "BEREIT" befindet (wie im Abschnitt "Betriebszustand" beschrieben), wählt der Scheduler den Ziel-Worker für diesen Operanden aus und sendet diesem Worker einen Job zur tatsächlichen Ausführung.

Kontrolle der Ausführung

Wenn der Operand an den Worker gesendet wird, wartet der OperandActor auf einen Rückruf des Workers. Wenn der Operand erfolgreich ist, wird der Nachfolger des Operanden geplant. Wenn der Operand nicht ausgeführt werden kann, versucht OperandActor es mehrmals. Wenn dies fehlschlägt, wird der Lauf als fehlgeschlagen markiert.

Brechen Sie den Job ab

Der Client kann die RESTful-API verwenden, um einen laufenden Job abzubrechen. Die Abbruchanforderung wird in den Statusspeicher des Diagramms geschrieben und die Abbruchschnittstelle in GraphActor wird aufgerufen. Befindet sich der Job in der Vorbereitungsphase, wird er sofort beendet, nachdem die Stoppanforderung erkannt wurde. Andernfalls wird die Anforderung an jeden Operandenakteur gesendet und der Status auf CANCELING gesetzt. Wenn der Operand zu diesem Zeitpunkt nicht arbeitet, wird der Status des Operanden direkt auf CANCELED gesetzt. Wenn der Operand ausgeführt wird, wird eine Stoppanforderung an den Worker gesendet, ein ExecutionInterrupted-Fehler tritt auf und er wird an den OperatingActor zurückgegeben. Zu diesem Zeitpunkt wird der Status des Operanden als ABGEBROCHEN markiert.

Vorbereitung des Ausführungsgraphen

Wenn Sie ein Tensordiagramm an den Mars-Scheduler senden, wird abhängig von den in der Datenquelle enthaltenen Blockparametern ein feineres Diagramm generiert, das aus Operanden und Blöcken besteht.

Grafikkomprimierung

Reduzieren Sie nach dem Generieren des Blockdiagramms die Größe des Diagramms, indem Sie die benachbarten Knoten im Diagramm zusammenführen. Durch diese Verschmelzung können Sie auch Beschleunigungsbibliotheken wie numexpr optimal nutzen, um Ihre Berechnungen zu beschleunigen. Derzeit fusioniert Mars nur die Operanden, die eine einzige Kette bilden. Zum Beispiel, wenn Sie den folgenden Code ausführen.

import mars.tensor as mt
a = mt.random.rand(100, chunks=100)
b = mt.random.rand(100, chunks=100)
c = (a + b).sum()

Mars verschmilzt die Operanden ADD und SUM mit dem Knoten FUSE. Landoperanden werden nicht zusammengeführt, da ADD und SUM keine einfache gerade Linie bilden.

image.png

Erstbeschäftigung

Das Zuweisen von Workern zu Operanden ist wichtig, um die Leistung der Diagrammausführung zu verbessern. Die zufällige Zuweisung der anfänglichen Operanden erhöht den Netzwerk-Overhead und kann zu einer unausgewogenen Jobverteilung zwischen verschiedenen Mitarbeitern führen. Die Zuordnung von anderen Knoten als dem Anfangsknoten kann leicht gemäß der physischen Verteilung der vom Vorläufer erzeugten Daten und dem Ruhezustand jedes Arbeiters bestimmt werden. Daher berücksichtigt die Vorbereitungsphase des Ausführungsgraphen nur die Zuweisung von Anfangsoperanden.

Bei der Zuweisung von Erstarbeitern müssen verschiedene Grundsätze beachtet werden. Erstens sollten die jedem Arbeiter zugewiesenen Operanden so ausgewogen wie möglich sein. Dies ermöglicht es dem Rechencluster, während der gesamten Ausführungsphase eine höhere Auslastung zu haben, was besonders während der letzten Ausführungsphase wichtig ist. Zweitens erfordert die erste Knotenzuweisung minimalen Netzwerkverkehr, wenn nachfolgende Knoten ausgeführt werden. Mit anderen Worten sollte die anfängliche Knotenzuweisung vollständig dem Prinzip folgen.

Bitte beachten Sie, dass die oben genannten Grundsätze miteinander in Konflikt stehen können. Allokationslösungen mit minimalem Netzwerkverkehr können sehr verzerrt sein. Wir haben einen heuristischen Algorithmus entwickelt, um die beiden Ziele auszugleichen. Der Algorithmus wird wie folgt beschrieben: ::

  1. Der erste Anfangsknoten und die erste Maschine in der Liste werden ausgewählt.
  2. In dem aus dem Operandendiagramm konvertierten ungerichteten Diagramm wird die Suche nach Tiefenpriorität von diesem Knoten aus gestartet.
  3. Starten Sie in dem aus dem Operandendiagramm konvertierten ungerichteten Diagramm die Suche nach der Tiefenpriorität von diesem Knoten aus. Wenn auf einen anderen nicht zugewiesenen Anfangsknoten zugegriffen wird, weisen Sie ihn der in Schritt 1 ausgewählten Maschine zu.
  4. Starten Sie außerdem die Tiefenprioritätssuche aus dem Diagramm, das aus dem Operandendiagramm konvertiert wurde.
  5. Wenn noch Mitarbeiter vorhanden sind, denen kein Operand zugewiesen wurde, fahren Sie mit Schritt 1 fort. Wenn noch Mitarbeiter ohne zugewiesene Operanden vorhanden sind, fahren Sie mit Schritt 1 fort.

Planungsrichtlinie

Wenn ein aus Operanden bestehender Graph ausgeführt wird, kann eine ordnungsgemäße Ausführungsreihenfolge die vorübergehend im Cluster gespeicherte Datenmenge verringern und infolgedessen die Wahrscheinlichkeit verringern, dass die Daten auf die Festplatte übertragen werden. .. Die richtigen Mitarbeiter können den gesamten laufenden Netzwerkverkehr reduzieren.

Operandenauswahlrichtlinie

Eine gute Ausführungssequenz kann die Gesamtmenge der vorübergehend im Cluster gespeicherten Daten erheblich reduzieren. Die folgende Abbildung zeigt ein Beispiel für die Baumreduzierung. Kreise zeigen Operanden an, Quadrate zeigen Blöcke an, Rot zeigt an, dass die Operanden ausgeführt werden, Blau zeigt an, dass die Operanden ausführbar sind, Grün zeigt an, dass die von den Operanden erzeugten Blöcke gespeichert sind, und Grau zeigt an, dass die Operanden und ihre zugehörigen Daten freigegeben wurden. Ich werde. Die folgende Abbildung zeigt die Situation, in der zwei Mitarbeiter vorhanden sind und jeder Operand dieselbe Menge an Ressourcen verwendet und diese in 5-Stunden-Einheiten mit unterschiedlichen Richtlinien ausgeführt werden. Die Abbildung links zeigt, dass sie gemäß der Hierarchie ausgeführt werden, und die Abbildung rechts zeigt, dass sie in der Reihenfolge der Tiefenpriorität ausgeführt werden. In der Grafik links müssen vorübergehend Daten für 6 Blöcke gespeichert werden, und in der Grafik rechts müssen nur Daten für 2 Blöcke gespeichert werden.

image.png

Da unser Ziel darin besteht, die Gesamtmenge der im Cluster gespeicherten Daten zu reduzieren, priorisieren wir die Operanden im Status READY.

  1. Sie müssen zuerst den tiefen Operanden ausführen. In 2 müssen Operanden, die von tieferen Operanden abhängen, zuerst ausgeführt werden. In 3 sollte der Knoten mit der kleineren Ausgabegröße zuerst ausgeführt werden.

Richtlinie zur Arbeitnehmerauswahl

Wenn der Scheduler bereit ist, das Diagramm auszuführen, wurde der Worker für den ersten Operanden bestimmt. Weist Worker für nachfolgende Operanden basierend auf dem Worker mit Eingabedaten zu. Wenn es einen Worker mit der größten Größe von Eingabedaten gibt, wird dieser Worker ausgewählt, um nachfolgende Operanden auszuführen. Wenn mehrere Mitarbeiter mit derselben Eingabedatengröße vorhanden sind, spielt der Ressourcenstatus jedes Kandidatenarbeiters eine entscheidende Rolle.

Zustand des Operanden

Jeder Mars-Operator wird vom Operating Actor individuell geplant. Die Ausführungsverarbeitung ist eine Zustandsübergangsverarbeitung. OperandActor definiert eine Zustandsübergangsfunktion beim Eingeben jedes Zustands. Bei der Initialisierung befindet sich der anfängliche Operand im Status READY und der nicht initiale Operand im Status UNSCHEDULED. Wenn die angegebene Bedingung erfüllt ist, wechselt der Operand in einen anderen Zustand und die entsprechende Operation wird ausgeführt. Die folgende Abbildung zeigt den Statusübergangsprozess.

image.png

Im Folgenden werden die Bedeutung jedes Zustands und die Operationen beschrieben, die Mars in diesen Zuständen ausführt.

--UNSCHEDULED: Der Status, in dem die Upstream-Daten des Operanden nicht bereit sind. --READY: Der Operand befindet sich in einem Zustand, in dem die Upstream-Daten dieses Operanden nicht bereit sind. Verwenden Sie in solchen Fällen Eigenschaften, um den Wert von Eigenschaften zu ändern. Der Scheduler sendet eine Stoppnachricht an andere Mitarbeiter und eine Nachricht an den Mitarbeiter, um den Job auszuführen. --RUNNING: Der Operand befindet sich in diesem Zustand, wenn seine Ausführung gestartet wird. In diesem Fall prüft der OperatingActor, ob der Auftrag übergeben wurde. In diesem Fall prüft der Operating Actor, ob ein Auftrag übergeben wurde. Der Bediener registriert dann den Rückruf beim Mitarbeiter und erhält eine Nachricht, dass der Auftrag abgeschlossen ist.

--FINISHED: Wenn sich der Operand in diesem Zustand befindet, wird eine Nachricht an den GraphActor gesendet, um festzustellen, ob die Ausführung des gesamten Graphen abgeschlossen ist. Wenn sich die Operanden in diesem Zustand befinden und kein Nachfolger vorhanden ist, wird eine Nachricht an den GraphActor gesendet, um festzustellen, ob die Ausführung des gesamten Diagramms abgeschlossen ist. Gleichzeitig sendet der OperandActor eine Nachricht an seine Vorläufer und Nachfolger, die angibt, dass die Ausführung abgeschlossen ist. Der Vorläufer, der die Nachricht empfängt, prüft, ob alle Nachfolger die Ausführung beendet haben. In diesem Fall können Sie die Daten für den aktuellen Operanden freigeben. Wenn der Nachfolger die Nachricht empfängt, prüft er, ob alle Vorläufer abgeschlossen sind. In diesem Fall können Sie den Status des Nachfolgers auf BEREIT ändern. --FREED: Der Operand befindet sich in diesem Zustand, wenn alle Daten freigegeben wurden. --FATAL: Der Operand befindet sich in diesem Zustand, wenn alle seine Daten freigegeben wurden. Der Operand befindet sich in diesem Zustand, wenn alle Wiederholungsversuche fehlschlagen. Wenn sich der Operand in diesem Zustand befindet, übergibt der Operand den gleichen Zustand an den Nachfolgeknoten. --CANCELLING: Dieser Status tritt auf, wenn der Operand abgebrochen wird. Wenn der Operand ausgeführt wird, senden Sie eine Anforderung an den Worker, um die Ausführung abzubrechen. --CANCELLED: Der Status, in dem der Operand abgebrochen wird. Dies ist der Zustand, in dem der Operand abgebrochen und die Ausführung gestoppt wird. Wenn der Lauf in diesen Zustand wechselt, versucht der Operating Actor, alle nachfolgenden Zustände in CANCELING zu verschieben.

Arbeitsinhalt

Mars-Mitarbeiter umfassen mehrere Prozesse, um die Auswirkungen von GIL zur Laufzeit zu verringern. Bestimmte Ausführungen werden in einem separaten Prozess ausgeführt. Um unnötige Speicherkopien und die Kommunikation zwischen Prozessen zu reduzieren, verwenden Mars-Mitarbeiter gemeinsam genutzten Speicher, um Ausführungsergebnisse zu speichern.

Wenn ein Job an einen Mitarbeiter übergeben wird, wird er zuerst für die Speicherzuweisung in die Warteschlange gestellt. Wenn Speicher zugewiesen wird, werden die Daten anderer Worker und die Daten, die auf der Festplatte des aktuellen Workers gespeichert sind, erneut in den Speicher geladen. Zu diesem Zeitpunkt befinden sich bereits alle für die Berechnung erforderlichen Daten im Speicher und Sie können den eigentlichen Berechnungsprozess starten. Wenn die Berechnung abgeschlossen ist, legt der Worker den Job im gemeinsam genutzten Speicher ab. Die Übergangsbeziehung zwischen den vier Ausführungszuständen ist in der folgenden Abbildung dargestellt.

image.png

Kontrolle der Ausführung

Der Mars-Worker steuert die Ausführung aller Operatoren innerhalb des Workers über den ExecutionActor. Der Akteur selbst ist nicht an der eigentlichen Operation oder Datenübertragung beteiligt, sondern übermittelt die Aufgabe lediglich an andere Akteure.

Der Scheduler OperandActor sendet den Job über den Aufruf "enqueue_graph" des ExecutionActor an den Worker. Der Worker akzeptiert die Eingabe des Operanden und speichert sie in der Warteschlange zwischen. Zu diesem Zeitpunkt akzeptiert der Worker den Posten des Operanden des Workers und speichert ihn in der Warteschlange zwischen. Wenn der Scheduler beschließt, den Operanden für den aktuellen Worker auszuführen, ruft er die Methode "start_execution" auf und registriert den Rückruf über "add_finish_callback". Dieses Design ermöglicht den Empfang von Ausführungsergebnissen an mehreren Standorten, was für die Notfallwiederherstellung hilfreich ist.

ExecutionActor verwendet das Modul "mars.promise", um Ausführungsanforderungen von mehreren Operanden gleichzeitig zu verarbeiten. Bestimmte Ausführungsschritte werden über die "then" -Methode der Promise-Klasse verknüpft. Wenn das endgültige Ausführungsergebnis gespeichert ist, wird der zuvor registrierte Rückruf ausgelöst. Wenn in einem der vorherigen Ausführungsschritte ein Fehler auftritt, wird der Fehler zur Verarbeitung an die in der catch-Methode registrierte Handlerfunktion übergeben.

Sortieren von Operanden

In solchen Fällen sendet der Scheduler eine große Anzahl von Operanden an den ausgewählten Worker. Daher ist für die meisten Ausführungszeiten die Anzahl der an den Worker gesendeten Operanden normalerweise größer als die Gesamtzahl der Operanden, die der Worker verarbeiten kann. Der Worker muss die Operanden sortieren und einige der auszuführenden Operanden auswählen. Dieser Sortiervorgang wird vom TaskQueueActor ausgeführt, der eine Prioritätswarteschlange verwaltet, in der Informationen zu den Operanden gespeichert werden. Gleichzeitig führt der TaskQueueActor die Jobzuweisungsaufgabe regelmäßig aus und weist dem obersten Operanden in der Prioritätswarteschlange Ausführungsressourcen zu, bis keine Ressourcen mehr zum Ausführen der Operanden vorhanden sind. Dieser Zuordnungsprozess wird auch ausgelöst, wenn ein neuer Operand gesendet wird oder wenn die Ausführung des Operanden abgeschlossen ist.

Speicherverwaltung

Marsarbeiter verwalten zwei Aspekte des Gedächtnisses. Der erste Teil ist der private Speicher jedes Arbeitsprozesses, den jeder Prozess besitzt. Der andere ist der von allen Prozessen gemeinsam genutzte Speicher, der von [plasma_store in Apache Arrow] gehalten wird (https://arrow.apache.org/docs/python/plasma.html?spm=a2c65.11461447.0.0.4be1c339z4ytI2). Ich bin.

Um einen Überlauf des Prozessspeichers zu vermeiden, haben wir einen QuotaActor auf Arbeiterebene eingeführt, der den Prozessspeicher zuweist. Vor dem Start der Ausführung des Operanden sendet der Operand einen Stapel von Speicheranforderungen für Eingabe- und Ausgabestücke an QuotaActor. Wenn der verbleibende Speicherplatz die Anforderung erfüllen kann, wird die Anforderung von QuotaActor akzeptiert. Andernfalls wird die Anforderung in die Warteschlange gestellt, um auf freie Ressourcen zu warten. Wenn der zugehörige Speicher freigegeben wird, wird auch die angeforderte Ressource freigegeben. Zu diesem Zeitpunkt kann QuotaActor anderen Operanden Ressourcen zuweisen.

Der gemeinsam genutzte Speicher wird von plasma_store verwaltet und belegt normalerweise 50% des gesamten Speichers. Da keine Möglichkeit eines Überlaufs besteht, wird dieser Teil des Speichers direkt über die zugehörige plasma_store-Methode und nicht über QuotaActor zugewiesen. Wenn der gemeinsam genutzte Speicher belegt ist, wirft der Mars-Worker nicht verwendete Chunks auf die Festplatte, um Speicherplatz für neue Chunks freizugeben.

Die Daten in Blöcken, die vom gemeinsam genutzten Speicher auf die Festplatte ausgegeben werden, können von nachfolgenden Operanden wiederverwendet werden. Das erneute Laden von Daten von der Festplatte in den gemeinsam genutzten Speicher ist jedoch besonders erschöpft und wird geladen. Es kann eine Menge E / A-Ressourcen erfordern, wenn Sie andere Blöcke auf die Festplatte kopieren müssen, um einen Block aufzunehmen. Wenn Sie also keine gemeinsame Nutzung von Daten benötigen (z. B. wenn Chunks nur in einem Operanden verwendet werden), laden Sie die Chunks direkt in den privaten Speicher des Prozesses anstatt in den gemeinsam genutzten Speicher. Dies kann die Gesamtausführungszeit des Jobs erheblich reduzieren.

Zukünftige Arbeit

Der Mars iteriert derzeit schnell. Wir erwägen, in naher Zukunft Failover- und Shuffle-Support auf Worker-Ebene zu implementieren, und planen auch Failover auf Scheduler-Ebene.

Recommended Posts

Verteilte Architektur vom Mars implementiert