[PYTHON] Architecture distribuée implémentée par Mars

Cet article présente l'architecture distribuée implémentée dans l'open source ** Mars ** d'Alibaba.

À quoi ressemble Mars article précédent, Mais après avoir testé sur un système interne, [Open source sur GitHub](https://www.alibabacloud.com/blog/mars-matrix-based-universal-distributed-computing-framework_594606?spm = a2c65.11461447.0.0.4be1c339z4ytI2). Cet article présente l'architecture d'exécution distribuée implémentée dans Mars.

Introduction de l'architecture

Mars fournit une bibliothèque pour l'exécution distribuée de tenseurs. Cette bibliothèque est écrite en utilisant le modèle d'acteur implémenté dans mars.actors et comprend des planificateurs, des workers et des services Web.

Le graphe soumis par le client à Mars Web Service est constitué de tenseurs. Le service Web reçoit le graphique et le soumet au planificateur. Avant de soumettre un travail à chaque ouvrier, le planificateur de Mars compile le graphe tensoriel en un graphe composé de morceaux et d'opérandes, analyse et divise le graphe. Le planificateur crée ensuite un ensemble d'OperandActors qui contrôlent l'exécution d'un seul opérande sur tous les planificateurs, sur la base d'un hachage cohérent. Les opérandes sont planifiés dans l'ordre topologique. Lorsque tous les opérandes sont exécutés, le diagramme entier est marqué comme complet et le client peut récupérer les résultats sur le Web. L'ensemble du processus d'exécution est illustré dans la figure ci-dessous.

image.png

Soumettre le travail

Le client soumet un travail au service Mars via l'API RESTful. Le client écrit le code dans le tenseur, transforme l'opération tenseur en un graphe composé de tenseurs via session.run (tensor) et l'envoie à l'API Web. L'API Web envoie ensuite le travail à SessionActor et crée un GraphActor pour l'analyse et la gestion des graphiques dans le cluster. Le client commence à interroger l'état d'exécution du graphique jusqu'à ce qu'il termine son exécution.

GraphActor transforme d'abord un graphe tensoriel en un graphe composé d'opérandes et de blocs selon les paramètres de bloc. Ce processus permet au graphique d'être subdivisé et exécuté en parallèle. Après cela, une série d'analyses est effectuée sur le graphique, la priorité de l'opérande est obtenue et un opérateur est affecté à l'opérande de départ. Pour cette partie, reportez-vous à "Préparation du graphe d'exécution". Ensuite, pour chaque opérande, créez un OperandActor pour contrôler l'exécution spécifique de l'opérande. Lorsqu'un opérande est à l'état READY (comme décrit dans la section ʻEtat de l'Opérande`), le planificateur sélectionne le worker cible pour cet opérande et soumet un travail à ce worker pour une exécution réelle.

Contrôle de l'exécution

Lorsque l'opérande est envoyé au worker, OperandActor attend un rappel sur le worker. Si l'opérande réussit, le successeur de l'opérande est planifié. Si l'opérande ne s'exécute pas, OperandActor essaiera plusieurs fois. En cas d'échec, l'exécution est marquée comme ayant échoué.

Annuler le travail

Le client peut utiliser l'API RESTful pour annuler une tâche en cours d'exécution. La demande d'annulation est écrite dans la mémoire d'état du graphique et l'interface d'annulation sur GraphActor est appelée. Si le travail est en phase préparatoire, il se terminera immédiatement après la détection de la demande d'arrêt, sinon la demande sera envoyée à chaque acteur d'opérande et l'état sera mis à ANNULER. À ce stade, si l'opérande ne fonctionne pas, l'état de l'opérande est défini directement sur ANNULÉ. Si l'opérande est en cours d'exécution, une demande d'arrêt est envoyée au travailleur, une erreur ExecutionInterrupted se produit et elle est renvoyée à OperatingActor. À ce stade, l'état de l'opérande est marqué comme ANNULÉ.

Préparation du graphe d'exécution

Lorsque vous soumettez un graphe tensoriel au planificateur Mars, un graphe plus fin composé d'opérandes et de blocs est généré, en fonction des paramètres de bloc contenus dans la source de données.

Compression graphique

Une fois le graphe de blocs généré, réduisez la taille du graphe en fusionnant les nœuds adjacents dans le graphe. Cette fusion vous permet également de profiter pleinement des bibliothèques d'accélération comme numexpr pour accélérer vos calculs. Actuellement, Mars fusionne uniquement les opérandes qui forment une seule chaîne. Par exemple, si vous exécutez le code suivant.

import mars.tensor as mt
a = mt.random.rand(100, chunks=100)
b = mt.random.rand(100, chunks=100)
c = (a + b).sum()

Mars fusionne les opérandes ADD et SUM dans le nœud FUSE. Les opérandes de terrain ne fusionnent pas car ADD et SUM ne forment pas une simple ligne droite.

image.png

Affectation initiale des travailleurs

L'affectation de nœuds de calcul à des opérandes est importante pour améliorer les performances d'exécution des graphiques. L'attribution aléatoire des opérandes initiaux augmente la surcharge du réseau et peut conduire à des affectations de tâches déséquilibrées entre différents travailleurs. L'affectation de nœuds autres que le nœud initial peut être facilement déterminée en fonction de la distribution physique des données générées par le précurseur et de l'état inactif de chaque travailleur. Par conséquent, l'étape de préparation du graphe d'exécution ne considère que l'affectation des opérandes initiaux.

Il existe plusieurs principes à suivre concernant l'affectation des premiers travailleurs. Tout d'abord, les opérandes attribués à chaque travailleur doivent être aussi équilibrés que possible. Cela permet au cluster de calcul d'avoir une utilisation plus élevée pendant toute la phase d'exécution, ce qui est particulièrement important pendant la phase finale de l'exécution. Deuxièmement, l'allocation du premier nœud nécessite un trafic réseau minimal lors de l'exécution des nœuds suivants. En d'autres termes, l'attribution initiale des nœuds doit suivre complètement le principe.

Veuillez noter que les principes ci-dessus peuvent entrer en conflit les uns avec les autres. Les solutions d'allocation avec un trafic réseau minimal peuvent être très déformées. Nous avons développé un algorithme heuristique pour équilibrer les deux objectifs. L'algorithme est décrit comme suit: :

  1. Le premier nœud initial et la première machine de la liste sont sélectionnés.
  2. Dans le graphe non orienté converti à partir du graphe d'opérande, la recherche de priorité de profondeur est lancée à partir de ce nœud.
  3. Dans le graphe non orienté converti à partir du graphe d'opérande, lancez la recherche de priorité de profondeur à partir de ce nœud; si un autre nœud initial non attribué est accédé, affectez-le à la machine sélectionnée à l'étape 1.
  4. Démarrez également la recherche de priorité de profondeur à partir du graphique converti à partir du graphique d'opérande.
  5. S'il y a encore des travailleurs auxquels aucun opérande n'a été attribué, passez à l'étape 1. S'il reste des nœuds de calcul sans opérandes affectés, passez à l'étape 1.

Politique de planification

Lorsqu'un graphe composé d'opérandes est exécuté, un ordre d'exécution approprié peut réduire la quantité de données stockées temporairement dans le cluster et, par conséquent, réduire la probabilité que les données soient vidées sur le disque. .. Les bons employés peuvent réduire le trafic réseau total en cours d'exécution.

Politique de sélection des opérandes

Une bonne séquence d'exécution peut réduire considérablement la quantité totale de données stockées temporairement dans le cluster. La figure suivante montre un exemple de réduction d'arbres. Les cercles indiquent les opérandes, les carrés indiquent les blocs, le rouge indique que les opérandes sont en cours d'exécution, le bleu indique que les opérandes sont exécutables, le vert indique que les blocs générés par les opérandes sont stockés et le gris indique que les opérandes et leurs données associées ont été libérés. Je vais. La figure ci-dessous montre la situation où il y a deux nœuds de calcul et chaque opérande utilise la même quantité de ressources, et ils sont exécutés en unités de 5 heures avec des politiques différentes. La figure de gauche montre qu'ils sont exécutés selon la hiérarchie, et la figure de droite montre qu'ils sont exécutés dans l'ordre de priorité de profondeur. Le graphique de gauche doit stocker temporairement des données pour 6 blocs, et le graphique de droite doit stocker des données pour seulement 2 blocs.

image.png

Notre objectif est de réduire la quantité totale de données stockées dans le cluster, nous priorisons donc les opérandes dans l'état READY.

  1. Vous devez d'abord exécuter l'opérande profond. 2, les opérandes qui dépendent d'opérandes plus profonds doivent être exécutés en premier. 3, le nœud avec la plus petite taille de sortie doit être exécuté en premier.

Politique de sélection des travailleurs

Lorsque le planificateur est prêt à exécuter le graphique, le travailleur du premier opérande a été déterminé. Affecte des nœuds de calcul aux opérandes suivants en fonction du nœud de calcul avec les données d'entrée. S'il existe un travailleur avec la plus grande taille de données d'entrée, ce travailleur est sélectionné pour exécuter les opérandes suivants. Lorsqu'il y a plusieurs nœuds de calcul avec la même taille de données d'entrée, l'état des ressources de chaque collaborateur candidat joue un rôle déterminant.

État de l'opérande

Chaque opérateur Mars est programmé individuellement par l'acteur opérationnel. Le traitement d'exécution est un traitement de transition d'état. OperandActor définit une fonction de transition d'état dans le processus d'entrée de chaque état. Lors de l'initialisation, l'opérande initial est à l'état READY et l'opérande non initial est à l'état UNSCHEDULED. Si la condition spécifiée est remplie, l'opérande passe à un autre état et l'opération correspondante est effectuée. La figure suivante montre le processus de transition d'état.

image.png

Ce qui suit décrit la signification de chaque état et les opérations que Mars effectue dans ces états.

--UNSCHEDULED: état lorsque les données en amont de l'opérande ne sont pas prêtes. --READY: l'opérande est dans un état où les données en amont de cet opérande ne sont pas prêtes. Dans de tels cas, utilisez Propriétés pour modifier la valeur de Propriétés. Le planificateur envoie un message d'arrêt aux autres travailleurs et envoie un message au travailleur pour qu'il démarre l'exécution du travail. --RUNNING: l'opérande sera dans cet état au démarrage de son exécution. Lorsque cela se produit, OperatingActor vérifie si le travail a été soumis. Lorsque cela se produit, l'acteur d'exploitation vérifie si un travail a été soumis. L'acteur opérationnel enregistre ensuite le rappel auprès du travailleur et reçoit un message indiquant que le travail est terminé.

--FINISHED: lorsque l'opérande est dans cet état, un message est envoyé au GraphActor pour déterminer si le graphe entier a fini de s'exécuter. Si les opérandes sont dans cet état et qu'il n'y a pas de successeur, un message est envoyé au GraphActor pour déterminer si le graphe entier a fini de s'exécuter. En même temps, l'OperandActor envoie un message à ses précurseurs et successeurs indiquant que l'exécution est terminée. Le précurseur qui reçoit le message vérifie si tous les successeurs ont terminé l’exécution. Dans ce cas, vous pouvez libérer les données de l'opérande actuel. Lorsque le successeur reçoit le message, il vérifie si tous les précurseurs sont terminés. Si tel est le cas, vous pouvez faire passer l'état du successeur à READY. --FREED: l'opérande sera dans cet état lorsque toutes les données auront été libérées. --FATAL: l'opérande est dans cet état lorsque toutes ses données ont été libérées. L'opérande sera dans cet état si toutes les tentatives de réexécution échouent. Lorsque l'opérande est dans cet état, l'opérande passe le même état au nœud successeur. --CANCELLING: Cet état se produit lorsque l'opérande est annulé. Si l'opérande est en cours d'exécution, envoyez une demande au travailleur pour annuler l'exécution. --CANCELLED: l'état lorsque l'opérande est annulé. Il s'agit de l'état lorsque l'opérande est annulé et que l'exécution est arrêtée. Lorsque l'analyse entre dans cet état, l'acteur d'exploitation essaiera de déplacer tous les états suivants sur ANNULATION.

Contenu du travailleur

Les travailleurs de Mars incluent plusieurs processus pour atténuer l'impact de GIL au moment de l'exécution. Certaines exécutions sont effectuées dans un processus distinct. Pour réduire les copies de mémoire inutiles et la communication entre les processus, les Marsworkers utilisent la mémoire partagée pour stocker les résultats de l'exécution.

Lorsqu'un travail est soumis à un worker, il est d'abord mis en file d'attente pour l'allocation de mémoire. Lorsque la mémoire est allouée, les données sur les autres nœuds de calcul et les données sauvegardées sur le disque du nœud de calcul actuel sont rechargées en mémoire. À ce stade, toutes les données nécessaires au calcul sont déjà en mémoire et vous êtes prêt à démarrer le processus de calcul proprement dit. Une fois le calcul terminé, le travailleur place le travail dans le stockage partagé. La relation de transition entre les quatre états d'exécution est illustrée dans la figure ci-dessous.

image.png

Contrôle de l'exécution

Le worker Mars contrôle l'exécution de tous les opérateurs au sein du worker via ExecutionActor. L'acteur lui-même n'est pas impliqué dans l'opération proprement dite ou le transfert de données, il soumet simplement la tâche à d'autres acteurs.

Le planificateur OperandActor soumet le travail au worker via l'appel ʻenqueue_graphd'ExecutionActor. Le worker accepte l'entrée de l'opérande et la met en cache dans la file d'attente. À ce stade, le travailleur accepte la publication de l'opérande du travailleur et le met en cache dans la file d'attente. Lorsque l'ordonnanceur décide d'exécuter l'opérande sur le worker courant, il appelle la méthodestart_execution et enregistre le rappel via ʻadd_finish_callback. Cette conception permet de recevoir les résultats de l'exécution dans plusieurs emplacements, ce qui est utile pour la reprise après sinistre.

ExecutionActor utilise le module mars.promise pour traiter les demandes d'exécution de plusieurs opérandes en même temps. Les étapes d'exécution spécifiques sont liées via la méthode "then" de la classe Promise. Lorsque le résultat final de l'exécution est stocké, le rappel précédemment enregistré est déclenché. Si une erreur se produit dans l'une des étapes d'exécution précédentes, l'erreur est transmise à la fonction de gestionnaire enregistrée dans la méthode catch pour traitement.

Tri des opérandes

Dans de tels cas, le planificateur envoie un grand nombre d'opérandes au travailleur sélectionné. Par conséquent, pour la plupart des temps d'exécution, le nombre d'opérandes soumis au travailleur est généralement supérieur au nombre total d'opérandes que le travailleur peut gérer. L'opérateur doit trier les opérandes et sélectionner certains des opérandes à exécuter à partir d'eux. Ce processus de tri est effectué par le TaskQueueActor, qui gère une file d'attente prioritaire qui stocke des informations sur les opérandes. En même temps, le TaskQueueActor exécute périodiquement la tâche d'affectation de travail et alloue des ressources d'exécution à l'opérande le plus haut dans la file d'attente prioritaire jusqu'à ce qu'il n'y ait plus de ressources pour exécuter les opérandes. Ce processus d'allocation est également déclenché lorsqu'un nouvel opérande est soumis ou lorsque l'exécution de l'opérande est terminée.

Gestion de la mémoire

Les travailleurs de Mars gèrent deux aspects de la mémoire. La première partie est la mémoire privée de chaque processus de travail, qui appartient à chaque processus. L'autre est la mémoire partagée par tous les processus, détenue par plasma_store dans Apache Arrow. Je suis.

Pour éviter un débordement de la mémoire de processus, nous avons introduit un QuotaActor au niveau du travail qui alloue la mémoire de processus. Avant de commencer l'exécution de l'opérande, l'opérande envoie un lot de demandes de mémoire à QuotaActor pour la segmentation des entrées et des sorties. Si l'espace mémoire restant peut répondre à la demande, la demande est acceptée par QuotaActor. Sinon, la demande sera mise en file d'attente pour attendre les ressources libres. Lorsque la mémoire associée est libérée, la ressource demandée l'est également. À ce stade, QuotaActor peut allouer des ressources à d'autres opérandes.

La mémoire partagée est gérée par plasma_store et occupe généralement 50% de la mémoire totale. Puisqu'il n'y a pas de possibilité de débordement, cette partie de la mémoire est allouée directement via la méthode plasma_store associée, et non via QuotaActor. Lorsque la mémoire partagée est épuisée, l'ouvrier Mars jette les blocs inutilisés sur le disque pour tenter de libérer de l'espace pour de nouveaux blocs.

Les données par blocs vidées de la mémoire partagée vers le disque peuvent être réutilisées par les opérandes suivants, mais le rechargement des données du disque vers la mémoire partagée est particulièrement épuisé et chargé. Cela peut nécessiter beaucoup de ressources d'E / S si vous devez vider d'autres blocs sur le disque pour en accueillir un. Donc, si vous n'avez pas besoin de partage de données (par exemple, si des morceaux sont utilisés dans un seul opérande), chargez les morceaux directement dans la mémoire privée du processus au lieu de la mémoire partagée. Cela peut réduire considérablement le temps total d'exécution du travail.

Travail futur

Mars effectue actuellement une itération rapide. Nous envisageons de mettre en œuvre dans un proche avenir la prise en charge du basculement et de la lecture aléatoire au niveau du travailleur, et planifions également le basculement au niveau du planificateur.

Recommended Posts

Architecture distribuée implémentée par Mars