Stupid (traitement parallèle distribué) par cluster IPython

Single Program Multiple Data (SPMD) est une approche basique et puissante du traitement parallèle distribué. Il s'agit d'un modèle dans lequel plusieurs processeurs exécutent le même programme et chaque processeur traite des données différentes à ce moment-là. En divisant les données à grande échelle en unités qui peuvent être traitées indépendamment et en traitant les données subdivisées en parallèle par plusieurs processeurs, le temps de traitement de l'ensemble des données peut être considérablement réduit.

Par exemple, exécutez la commande job avec ʻa01.txt, ʻa02.txt, ʻa03.txt, ʻa04.txt comme fichier d'entrée, et exécutez le résultat de l'exécution (sortie) commeb01.txt. Pensez à stocker dans, b02.txt, b03txt, b04.txt. Le code suivant implémente ce processus avec un script shell bash.

#!/bin/bash
for src in $@
do
   job < $src > ${src/a/b}
done

Etant donné que les contenus de traitement dans cette boucle for sont indépendants les uns des autres, ils peuvent être facilement parallélisés et le temps de traitement global peut être raccourci.

bakapara.png

Dans le traitement du langage naturel et l'apprentissage automatique, il existe de nombreux processus qui peuvent être facilement parallélisés par division de données, tels que l'analyse morphologique et l'extraction d'éléments. Il semble qu'un tel traitement parallèle (distribué) soit appelé ** stupide ** (au Japon). Dans cet article, comment réaliser la stupidité par exécution de commandes avec cluster IPython, bibliothèque spécialisée dans la stupidité Bakapara //github.com/chokkan/bakapara) est introduit.

introduction

Selon le document officiel Présentation de l'architecture, le cluster IPython se compose des quatre éléments suivants.

  1. ** Engine **: interpréteur IPython qui exécute du code Python qui s'exécute en parallèle. Il sera lancé autant de fois que vous le souhaitez pour s'exécuter en parallèle sur l'hôte que vous souhaitez exécuter. Chaque moteur bloque les autres opérations pendant l'exécution du programme de l'utilisateur.
  2. ** Contrôleur **: Interface de commande du groupe moteur. L'utilisateur gère le fonctionnement du moteur en actionnant le contrôleur. En interne, il se compose d'un concentrateur et de plusieurs planificateurs.
  3. ** Hub **: le cœur de l'environnement d'exécution du cluster. Gère de manière centralisée les connexions au moteur, au planificateur, aux clients, aux résultats d'exécution, etc.
  4. ** Scheduler **: Gérez les travaux du moteur.

Pour créer un environnement d'exécution parallèle avec IPython, il est nécessaire de démarrer un contrôleur et plusieurs moteurs. Il existe deux façons de démarrer le contrôleur et le moteur.

  1. Démarrez automatiquement le contrôleur et le moteur avec la commande ʻipcluster`
  2. Démarrez manuellement le contrôleur avec la commande ʻipcontroller et le moteur avec la commande ʻipengine. Cette fois, je voudrais créer facilement un environnement de cluster en utilisant la commande ʻipcluster`.

Dans cet article, les éléments suivants sont considérés comme l'environnement du groupe de serveurs.

Afin de donner une explication concrète, cet article expliquera la configuration de serveur suivante à titre d'exemple.

Paramètres du cluster IPython

Créez et modifiez le fichier de paramètres en vous référant au document officiel Utilisation d'ipcluster en mode SSH. Faire. Le cluster IPython est pratique pour gérer l'environnement d'exécution du cluster dans des unités appelées ** profils **. Le nom du profil est arbitraire, mais ici nous allons créer un profil nommé "mezcal" basé sur le nom du groupe de serveurs du moteur.

$ ipython profile create --parallel --profile=mezcal
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_notebook_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_nbconvert_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipcontroller_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipengine_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipcluster_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/iplogger_config.py'

Un répertoire appelé $ HOME / .ipython / profile_ {nom du profil} est créé et le fichier de configuration du cluster IPython ʻipcluster_config.pyest créé. Ouvrez ce fichier avec un éditeur et configurez le moteur d'exécution de chaque nœud pour qu'il démarre via SSH. L'emplacement du paramètre s'affiche avec des commentaires à gauche. Soitc.IPClusterStart.engine_launcher_class être'SSH'`.

ipcluster_config.py


# The class for launching a set of Engines. Change this value to use various
# batch systems to launch your engines, such as PBS,SGE,MPI,etc. Each launcher
# class has its own set of configuration options, for making sure it will work
# in your environment.
#
# You can also write your own launcher, and specify it's absolute import path,
# as in 'mymodule.launcher.FTLEnginesLauncher`.
#
# IPython's bundled examples include:
#
#     Local : start engines locally as subprocesses [default]
#     MPI : use mpiexec to launch engines in an MPI environment
#     PBS : use PBS (qsub) to submit engines to a batch queue
#     SGE : use SGE (qsub) to submit engines to a batch queue
#     LSF : use LSF (bsub) to submit engines to a batch queue
#     SSH : use SSH to start the controller
#                 Note that SSH does *not* move the connection files
#                 around, so you will likely have to do this manually
#                 unless the machines are on a shared file system.
#     HTCondor : use HTCondor to submit engines to a batch queue
#     WindowsHPC : use Windows HPC
#
# If you are using one of IPython's builtin launchers, you can specify just the
# prefix, e.g:
#
#     c.IPClusterEngines.engine_launcher_class = 'SSH'
#
# or:
#
#     ipcluster start --engines=MPI
c.IPClusterStart.engine_launcher_class = 'SSH'

La différence avec le jeu c.IPClusterStart.engine_launcher_class précédemment est inconnue, mais le c.IPClusterEngines.engine_launcher_class est également réglé sur SSH ''. De plus, spécifiez le nom d'hôte et le nombre de moteurs (nombre d'exécutions parallèles) pour effectuer le traitement parallèle distribué dans l'objet dictionnaire c.SSHEngineSetLauncher.engines. ʻEnginesDéfinit le nom d'hôte dans la clé de l'objet dictionnaire et le nombre de moteurs dans la valeur. Voici un exemple de configuration pour démarrer 4 moteurs d'exécution chacun avecmezcal [[01-12]] .cl.ecei.tohoku.ac.jp` et effectuer jusqu'à 48 processus parallèles.

ipcluster_config.py


# The class for launching a set of Engines. Change this value to use various
# batch systems to launch your engines, such as PBS,SGE,MPI,etc. Each launcher
# class has its own set of configuration options, for making sure it will work
# in your environment.
#
# You can also write your own launcher, and specify it's absolute import path,
# as in 'mymodule.launcher.FTLEnginesLauncher`.
#
# IPython's bundled examples include:
#
#     Local : start engines locally as subprocesses [default]
#     MPI : use mpiexec to launch engines in an MPI environment
#     PBS : use PBS (qsub) to submit engines to a batch queue
#     SGE : use SGE (qsub) to submit engines to a batch queue
#     LSF : use LSF (bsub) to submit engines to a batch queue
#     SSH : use SSH to start the controller
#                 Note that SSH does *not* move the connection files
#                 around, so you will likely have to do this manually
#                 unless the machines are on a shared file system.
#     HTCondor : use HTCondor to submit engines to a batch queue
#     WindowsHPC : use Windows HPC
#
# If you are using one of IPython's builtin launchers, you can specify just the
# prefix, e.g:
#
#     c.IPClusterEngines.engine_launcher_class = 'SSH'
#
# or:
#
#     ipcluster start --engines=MPI
c.IPClusterEngines.engine_launcher_class = 'SSH'

c.SSHEngineSetLauncher.engines = {
    'mezcal01.cl.ecei.tohoku.ac.jp': 4,
    'mezcal02.cl.ecei.tohoku.ac.jp': 4,
    'mezcal03.cl.ecei.tohoku.ac.jp': 4,
    'mezcal04.cl.ecei.tohoku.ac.jp': 4,
    'mezcal05.cl.ecei.tohoku.ac.jp': 4,
    'mezcal06.cl.ecei.tohoku.ac.jp': 4,
    'mezcal07.cl.ecei.tohoku.ac.jp': 4,
    'mezcal08.cl.ecei.tohoku.ac.jp': 4,
    'mezcal09.cl.ecei.tohoku.ac.jp': 4,
    'mezcal10.cl.ecei.tohoku.ac.jp': 4,
    'mezcal11.cl.ecei.tohoku.ac.jp': 4,
    'mezcal12.cl.ecei.tohoku.ac.jp': 4,
}

Si le serveur exécutant le terminal ou le bloc-notes IPython est différent du serveur de contrôleur, vous devez autoriser les connexions d'autres serveurs au contrôleur. Si les serveurs sont situés sur un réseau local approuvé, il est pratique d'autoriser tous les hôtes à se connecter au contrôleur. ʻAjouter"--ip = '*'"aux options de démarrage d'ipcontroller` (par défaut, seuls les hôtes locaux peuvent se connecter).

ipcluster_config.py


#------------------------------------------------------------------------------
# LocalControllerLauncher configuration
#------------------------------------------------------------------------------

# Launch a controller as a regular external process.

# command-line args to pass to ipcontroller
c.LocalControllerLauncher.controller_args = ["--ip='*'", '--log-to-file', '--log-level=20']

Cette fois, le répertoire de base est partagé entre le contrôleur et l'hôte du moteur, ajoutez donc les paramètres suivants.

ipcluster_config.py


#------------------------------------------------------------------------------
# SSHLauncher configuration
#------------------------------------------------------------------------------

# A minimal launcher for ssh.
#
# To be useful this will probably have to be extended to use the ``sshx`` idea
# for environment variables.  There could be other things this needs as well.

# hostname on which to launch the program
# c.SSHLauncher.hostname = ''

# command for starting ssh
# c.SSHLauncher.ssh_cmd = ['ssh']

# user@hostname location for ssh in one setting
# c.SSHLauncher.location = ''

# List of (local, remote) files to send before starting
c.SSHLauncher.to_send = []

# command for sending files
# c.SSHLauncher.scp_cmd = ['scp']

# List of (remote, local) files to fetch after starting
c.SSHLauncher.to_fetch = []

# args to pass to ssh
# c.SSHLauncher.ssh_args = ['-tt']

# username for ssh
# c.SSHLauncher.user = ''

Pour comprendre la signification de ce paramètre, il est nécessaire de comprendre le flux jusqu'au démarrage du cluster IPython. Le flux jusqu'au démarrage du cluster IPython est affiché.

  1. Démarrez le contrôleur
  2. Transférez le fichier ʻipcontroller-engine.json` créé par le contrôleur vers l'hôte du moteur avec scp
  3. Démarrez chaque moteur Ici, si le répertoire personnel est partagé, l'étape 2 n'est pas nécessaire.

Démarrer le cluster

Exécutez la commande ʻipcluster` sur l'hôte sur lequel vous souhaitez exécuter le contrôleur et démarrez le contrôleur et le moteur ensemble. À ce stade, spécifiez le nom du profil avec l'option --profile.

$ ipcluster start --profile=mezcal
2014-12-11 14:15:49.891 [IPClusterStart] Using existing profile dir: u'/home/okazaki/.ipython/profile_mezcal'
2014-12-11 14:15:50.023 [IPClusterStart] Starting ipcluster with [daemon=False]
2014-12-11 14:15:50.025 [IPClusterStart] Creating pid file: /home/okazaki/.ipython/profile_mezcal/pid/ipcluster.pid
2014-12-11 14:15:50.025 [IPClusterStart] Starting Controller with LocalControllerLauncher
2014-12-11 14:15:51.024 [IPClusterStart] Starting 48 Engines with SSH
2014-12-11 14:16:25.117 [IPClusterStart] Engines appear to have started successfully

Si «Les moteurs semblent avoir démarré avec succès» s'affiche, cela signifie que l'opération réussit. À partir du message "Démarrage de 48 moteurs avec SSH", vous pouvez confirmer que 12 $ \ fois 4 = 48 $ de moteurs ont été démarrés.

Exécutez le programme sur le cluster

«Importez le module IPython.parallel».

In [1]: from IPython.parallel import Client

Créez un objet Client pour faire fonctionner le contrôleur et le moteur. Spécifiez le nom du profil dans l'argument profile.

In [2]: rc = Client(profile='mezcal')

Si vous vérifiez l'ID du moteur auquel l'objet Client est connecté, vous pouvez confirmer qu'il est connecté à 48 moteurs.

In [3]: rc.ids
Out[3]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47]

La procédure suivante n'a rien à voir avec la stupidité, mais je vais l'expliquer comme une utilisation générale du cluster IPython. Utilisez l'instance DirectView pour exécuter du code directement sur chaque moteur sans passer par le planificateur de tâches.

In [4]: dview = rc[:]

Le code suivant calcule $ x ^ 2 $ pour $ x \ in \ {0, 1, ..., 49 \} $ (sans parallélisation).

In [5]: serial_result =  map(lambda x: x**2, range(50))

Parallélisons ce calcul pour chaque élément $ x \ in \ {0, 1, ..., 49 \} $.

In [6]: parallel_result = dview.map_sync(lambda x: x**2, range(50))

Les résultats exécutés par chaque moteur sont agrégés et stockés dans parallel_result.

In [7]: parallel_result
Out[7]:
[0,
 1,
 4,
 9,
 ...
 2401]

Bien entendu, le résultat du calcul est le même quelle que soit la présence ou l'absence de parallélisation.

In [8]: serial_result == parallel_result
Out[8]: True

Le décorateur remote vous permet de définir une fonction (fonction remote) à exécuter par chaque moteur. La fonction suivante gethostname est une fonction qui obtient et renvoie le nom d'hôte avec socket.getfqdn (), mais notez que le module socket est importé dans la fonction. L'importation d'un module côté client ne signifie pas que le processus IPython sur le moteur a importé le module, il est donc nécessaire d'importer le module à l'intérieur de la fonction.

In [9]: @dview.remote(block=True)
   ...: def gethostname():
   ...:     import socket
   ...:     return socket.getfqdn()
   ...:

Vous pouvez obtenir le nom d'hôte de chaque moteur en appelant la fonction gethostname sur le client. L'ordre est en panne, mais vous pouvez voir que chacun des quatre moteurs fonctionne sur les hôtes de mezcal01 à mezcal12.

In [10]: gethostname()
Out[10]: 
['mezcal06.cl.ecei.tohoku.ac.jp',
 'mezcal06.cl.ecei.tohoku.ac.jp',
 'mezcal06.cl.ecei.tohoku.ac.jp',
 'mezcal06.cl.ecei.tohoku.ac.jp',
 'mezcal09.cl.ecei.tohoku.ac.jp',
 'mezcal09.cl.ecei.tohoku.ac.jp',
 'mezcal09.cl.ecei.tohoku.ac.jp',
 'mezcal09.cl.ecei.tohoku.ac.jp',
 'mezcal04.cl.ecei.tohoku.ac.jp',
 'mezcal04.cl.ecei.tohoku.ac.jp',
 'mezcal04.cl.ecei.tohoku.ac.jp',
 'mezcal04.cl.ecei.tohoku.ac.jp',
 ...
 'mezcal01.cl.ecei.tohoku.ac.jp',
 'mezcal03.cl.ecei.tohoku.ac.jp',
 'mezcal03.cl.ecei.tohoku.ac.jp',
 'mezcal03.cl.ecei.tohoku.ac.jp',
 'mezcal03.cl.ecei.tohoku.ac.jp']

Il existe également des décorateurs «parallèles» qui définissent des fonctions qui s'exécutent en parallèle. Pour plus de détails, reportez-vous à Interface directe d'IPython.

Exécution via le planificateur

L'instance LoadBalancedView exécute les travaux en utilisant la répartition dynamique de la charge. Vous ne pourrez pas accéder directement aux moteurs individuels, mais vous pouvez implémenter les files d'attente de travaux sur le moteur de cluster, comme les files d'attente de travaux dans multiprocessing.Pool.

À titre d'exemple simple, exécutons la commande «sleep 10» sur chaque moteur. Créez une liste contenant les travaux que vous souhaitez exécuter.

In [11]: jobs = [dict(cmd='sleep 10') for i in range(100)]

Chaque élément de cette liste est de type dictionnaire et stocke la commande à exécuter dans la valeur de la clé cmd. Cette fois, sleep 10 est exécuté pour tous les jobs, mais en faisant de la stupidité, le contenu de la commande devrait changer en fonction des données d'entrée. Les 5 premiers emplois ressemblent à ceci.

In [12]: jobs[:5]
Out[12]:
[{'cmd': 'sleep 10'},
 {'cmd': 'sleep 10'},
 {'cmd': 'sleep 10'},
 {'cmd': 'sleep 10'},
 {'cmd': 'sleep 10'}]

Implémentez la fonction runjob qui exécute le travail (commande) représenté par l'objet dictionnaire. La valeur de la clé cmd de l'objet dictionnaire reçu est exécutée sur le shell, et la valeur de retour est stockée dans l'objet dictionnaire et renvoyée.

In [13]: def runjob(job):
   ....:     import subprocess
   ....:     try:
   ....:         returncode = subprocess.call(job['cmd'], shell=True)
   ....:         return dict(code=returncode)
   ....:     except OSError, e:
   ....:         return dict(error=str(e))
   ....:

Afin d'exécuter cette fonction runjob de manière séquentielle dans la file d'attente des travaux, récupérez une instanceLoadBalancedView du client.

In [14]: lview = rc.load_balanced_view()

Ensuite, la fonction runjob est exécutée de manière asynchrone pour chaque élément de la liste jobs.

In [15]: ar = lview.map_async(runjob, jobs)

L'exécution de ce code n'est pas bloquée et un objet ʻAsyncResult immédiat est renvoyé. Vous pouvez vérifier le résultat de l'exécution du travail et l'état d'avancement via cet objet ʻAsyncResult. Par exemple, affichons l'état d'exécution du travail sur le shell interactif (également disponible sur le notebook IPython).

In [16]: ar.wait_interactive()
  48/100 tasks finished after   15 s

Lorsque cette fonction wait_interactive est appelée, l'état d'exécution du travail est affiché sur le shell toutes les secondes. L'affichage ci-dessus indique que 48 travaux sur 100 ont été terminés 15 secondes après le début de l'exécution du travail. Étant donné que le temps requis pour un travail est de 10 secondes et que 48 moteurs sont utilisés en même temps, 48 travaux sont terminés en 10 secondes à compter du début de l'exécution, et 48 autres travaux sont terminés entre 10 et 20 secondes. Sera exécuté. Lorsque tous les travaux sont terminés, ce qui suit s'affiche.

In [16]: ar.wait_interactive()
 100/100 tasks finished after   30 s
done

Chaque travail est exécuté de manière asynchrone, mais la fonction wait_interactive ne se termine pas tant que tous les travaux ne sont pas terminés. Si vous voulez arrêter d'afficher la progression du travail, vous pouvez l'interrompre avec [Ctrl] + [c]("Kernel" - "Interrupt" pour le notebook IPython). Même si l'affichage est interrompu, l'exécution du travail se poursuit.

Vous pouvez vérifier le résultat de l'exécution du travail à partir de l'objet ʻAsyncResult`. Vérifions les résultats d'exécution des 5 premiers jobs.

In [17]: ar[:5]
Out[17]: [{'code': 0}, {'code': 0}, {'code': 0}, {'code': 0}, {'code': 0}]

Lorsque «code» est «0», cela signifie que la valeur de retour du shell («/ bin / sh») qui a exécuté la commande «sleep» était «0».

Module Bakapara

Bakapara est une modularisation du processus d'exécution d'une file d'attente de travaux par une commande sur le shell sur un cluster IPython. Il est conçu pour être utilisé à partir du shell interactif de Python et du notebook IPython. Il peut également être utilisé indépendamment de la ligne de commande.

Installation

Si vous voulez l'utiliser comme module, téléchargez bakapara.py et placez-le dans le répertoire où passe PYTHONPATH. .. Lors de l'exécution à partir de la ligne de commande, il est pratique de le placer dans un répertoire qui se trouve dans le PATH.

Spécifications du travail

L'objet Bakapara reçoit une liste de travaux et les exécute sur le moteur de cluster. Chaque travail peut être dans n'importe quel format à condition qu'il s'agisse d'un objet de dictionnaire répondant aux spécifications suivantes.

Clé valeur Exemple
cmd La commande que vous souhaitez exécuter sur le moteur. Puisque la commande est en fait exécutée via le shell, des tubes et des redirections peuvent également être utilisés. wc -l /work/001.txt
cd (Facultatif) Répertoire de travail lorsque le moteur exécute des commandes. Si cette clé n'existe pas, elle ne modifiera pas le répertoire de travail du moteur. Le répertoire de travail de chaque moteur est initialisé avec le répertoire de travail lors de la construction de l'objet Bakapara. /home/okazaki/projects/bakapara
out (Facultatif) Si vous spécifiez un nom de fichier pour cette valeur, la sortie standard lorsque la commande est exécutée est enregistrée dans le fichier. Si cette clé n'existe pas, le contenu de la sortie standard ne sera pas enregistré. /work/001.txt.out
err (Facultatif) Si vous spécifiez un nom de fichier pour cette valeur, la sortie d'erreur standard lorsque la commande est exécutée est enregistrée dans le fichier. Si cette clé n'existe pas, le contenu de la sortie d'erreur standard ne sera pas enregistré. /work/001.txt.err
host (Facultatif) Une liste d'hôtes (moteurs) qui peuvent exécuter des commandes. Si cette clé n'existe pas, le travail est considéré comme exécutable sur tous les moteurs. ['mezcal03.cl.ecei.tohoku.ac.jp',]

host est utilisé lorsque vous souhaitez exécuter une tâche spécifique sur un hôte spécifique. Par exemple, si les données que vous souhaitez traiter sont distribuées sur le disque local de chaque serveur, vous pouvez spécifier l'hôte sur lequel se trouvent les données requises pour exécuter le travail avec host. De plus, le système de fichiers distribué GFarm vous permet de vérifier sur quel hôte se trouve chaque fichier du système de fichiers distribué, afin de traiter En spécifiant l'hôte sur lequel se trouvent les données, un traitement parallèle distribué qui prend en compte la localité des données telle que HDFS + Hadoop peut être réalisé.

Le résultat de l'exécution de chaque travail est stocké (écrasé) comme la valeur de la clé result de l'objet dictionnaire. La valeur de result est un objet dictionnaire avec les spécifications suivantes.

Clé valeur Exemple
code Code de sortie 0
submitted Date et heure auxquelles le travail a été soumis par le client '2014-12-13T01:17:05.593718'
started La date et l'heure auxquelles le travail a commencé à s'exécuter sur le moteur '2014-12-13T01:17:04.559970'
completed Date et heure auxquelles l'exécution du travail s'est terminée sur le moteur '2014-12-13T01:17:14.566251'
received Date et heure auxquelles le client a reçu le résultat de l'exécution du travail '2014-12-13T01:17:15.614301'
elapsed Temps requis pour exécuter le travail (completed-started '0:00:10.006281'
engine_id ID (index) du moteur qui a exécuté le travail 3
host Nom d'hôte du moteur qui a exécuté le travail 'mezcal06.cl.ecei.tohoku.ac.jp'
pyerr Exceptions Python (le cas échéant). Lorsque vous spécifiez le nom d'hôte pour exécuter le travailUnmetDependencyUne exception peut s'afficher, mais c'est normal. None
pyout Sortie de l'interpréteur Python (le cas échéant) None
status 'ok'Ou'error' 'ok'
msg_id UUID des messages échangés entre le client et le moteur u'48fbc58b-ef73-4815-9f32-1932c01421ad'
error (Existe uniquement lorsqu'une erreur fatale se produit) Message d'erreur ''

Notez qu'il existe des cas où l'exécution de la commande échoue même si "status" est "ok". Par exemple, si vous faites une erreur en écrivant la commande cmd et que vous ne pouvez pas l'exécuter, status sera `` ok', mais la sortie d'erreur standard sera '/ bin / bash: 1: hoge: not found. Un message comme \ n'reste. La commande de travail est exécutée via/ bin / bash -o pipe fail`. Par conséquent, si l'une des commandes acheminées renvoie un code d'état autre que «0», la valeur de retour sera stockée dans «code». Par conséquent, il est important de vérifier la valeur de retour «code».

Utiliser sur le shell interactif Python

Tout d'abord, importez le module bakapara et créez une instance Bakapara. La spécification du constructeur de la classe Bakapara est [IPython.parallel.Client](http://ipython.org/ipython-doc/dev/api/generated/IPython.parallel.client.client.html#IPython.parallel.client] .client.Client) est le même. Normalement, vous spécifiez le nom du profil comme suit.

In [1]: from bakapara import Bakapara

In [2]: bp = Bakapara(profile='mezcal')

Créez une liste de travaux que vous souhaitez exécuter. Le code suivant crée un travail qui exécute la commande «sleep 10» 100 fois.

In [3]: jobs = [dict(cmd='sleep 10') for i in range(100)]

Par souci d'explication, vérifions le premier travail.

In [4]: jobs[0]
Out[4]: {'cmd': 'sleep 10'}

Utilisez la méthode run pour démarrer le travail. La valeur de retour «True» indique que l'enregistrement du travail a réussi.

In [5]: bp.run(jobs)
Out[5]: True

Vous pouvez vérifier la progression du travail avec la méthode status. La progression du travail est affichée toutes les secondes.

In [6]: bp.status()
48/100 tasks finished after 0:00:12.554905

Cette méthode status ne termine pas son exécution tant que tous les jobs ne sont pas terminés. Si vous voulez faire autre chose, appuyez sur [Ctrl] + [c]("Kernel" - "Interrupt" dans le notebook IPython) pour interrompre.

Une fois l'exécution du travail terminée, les informations suivantes s'affichent.

In [7]: bp.status()
100/100 tasks finished after 0:00:30.137117
100 tasks completed in 0:00:30.137117

Lorsque le travail est terminé, le résultat est écrit dans la liste des travaux (la liste des travaux donnée à la méthode run de la classe Bakapara). Pour être précis, le résultat de l'exécution est écrit dans la liste jobs en appelant la méthode status ou la méthode wait une fois le travail terminé. Même si le travail entier n'est pas terminé, les résultats jusqu'au moment où la méthode status ou la méthode wait est appelée sont écrits dans la liste des travaux. Pour obtenir le résultat de l'exécution du travail, appelez la méthode status et vérifiez que le message" tâches xx / xxx terminées "s'affiche.

Vérifions le résultat de l'exécution. Vous pouvez accéder à des informations telles que le nom d'hôte qui a exécuté le travail, l'heure requise et le code de fin.

In [8]: jobs[0]
Out[8]:
{'cmd': 'sleep 10',
 'result': {'code': 0,
  'completed': '2014-12-13T12:26:12.665525',
  'elapsed': '0:00:10.005647',
  'engine_id': 11,
  'host': 'mezcal11.cl.ecei.tohoku.ac.jp',
  'msg_id': u'72666439-9364-4a37-9dfb-8a04921d9a0c',
  'pyerr': None,
  'pyout': None,
  'received': '2014-12-13T12:24:38.638917',
  'started': '2014-12-13T12:26:02.659878',
  'status': u'ok',
  'submitted': '2014-12-13T12:23:58.320781'}}

Utiliser à partir de la ligne de commande

Comme nous l'avons vu, l'utilisation du module Bakapara est facile. Cependant, même si je ne comprends pas du tout Python, je pense qu'il y a des gens qui veulent effectuer un traitement parallèle distribué stupide. Nous avons également préparé une interface de ligne de commande pour ces personnes. Lorsque bakapara.py est exécuté seul, le JSON du travail est lu à partir de l'entrée standard et le JSON du résultat de l'exécution du travail est écrit dans la sortie standard. Le JSON d'entrée et de sortie est un travail par ligne, et le format est adopté dans lequel l'objet dictionnaire du travail est décrit en JSON sur chaque ligne. La raison pour laquelle le travail entier n'est pas stocké dans le format de liste est que le résultat de l'exécution peut être écrit dès que le travail est terminé.

Par exemple, le fichier JSON d'un travail qui exécute 100 sleep 10s est le suivant.

$ head -n5 sleep.task.json
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}

Pour exécuter ce travail et écrire le résultat de l'exécution dans sleep.result.json, exécutez la commande suivante. Le nom de profil du cluster IPython est spécifié avec l'option -p.

$ python bakapara.py -p mezcal < sleep.task.json > sleep.result.json
2014-12-21 10:39:49,231 total 100 jobs on 48 engines
2014-12-21 10:39:59,288 [1/100] returned 0 in 0:00:10.007444 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [2/100] returned 0 in 0:00:10.005645 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [3/100] returned 0 in 0:00:10.005994 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [4/100] returned 0 in 0:00:10.006593 sec on mezcal01.cl.ecei.tohoku.ac.jp: sleep 10
...
2014-12-21 10:40:19,282 [97/100] returned 0 in 0:00:10.005299 sec on mezcal08.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [98/100] returned 0 in 0:00:10.005097 sec on mezcal08.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [99/100] returned 0 in 0:00:10.005758 sec on mezcal02.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [100/100] returned 0 in 0:00:10.004995 sec on mezcal02.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 completed

À partir de la sortie standard, le travail exécuté est renvoyé sous la forme d'un objet dictionnaire (encodé en JSON). Vous pouvez voir que le résultat de l'exécution est stocké dans la clé result.

$ head -n5 sleep.result.json
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.262566", "code": 0, "engine_id": 9, "started": "2014-12-21T10:40:46.649199", "completed": "2014-12-21T10:40:56.656643", "msg_id": "22d664c5-793a-44f1-b29d-c74f2aa434c1", "submitted": "2014-12-21T10:39:49.235879", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.007444", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.262205", "code": 0, "engine_id": 11, "started": "2014-12-21T10:40:46.650998", "completed": "2014-12-21T10:40:56.656643", "msg_id": "e8eb5db2-ac9b-481b-b0a4-fdb2ef15be62", "submitted": "2014-12-21T10:39:49.236327", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.005645", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.287773", "code": 0, "engine_id": 8, "started": "2014-12-21T10:40:46.679033", "completed": "2014-12-21T10:40:56.685027", "msg_id": "8a7e6fe0-482a-4ae0-a2ff-8321849aa8b0", "submitted": "2014-12-21T10:39:49.244347", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.005994", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.284039", "code": 0, "engine_id": 46, "started": "2014-12-21T10:40:46.698136", "completed": "2014-12-21T10:40:56.704729", "msg_id": "f03f9b93-4a60-494b-9a21-625cdcac252e", "submitted": "2014-12-21T10:39:49.242042", "pyerr": null, "host": "mezcal01.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.006593", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.259553", "code": 0, "engine_id": 28, "started": "2014-12-21T10:40:46.889807", "completed": "2014-12-21T10:40:56.895995", "msg_id": "bc9e7b74-64ba-45f4-ac0e-31b27db5d862", "submitted": "2014-12-21T10:39:49.234939", "pyerr": null, "host": "mezcal07.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.006188", "pyout": null}}

À propos de l'interruption de travail

En examinant les résultats de l'exécution des travaux exécutés sur le cluster, vous pouvez remarquer que le contenu du travail est incorrect ou que le travail est hors de contrôle. Dans ce cas, l'exécution du travail sera annulée. Il existe deux façons d'annuler une tâche exécutée dans Bakapara.

  1. Appelez la méthode ʻabort () `.
  2. Arrêtez le processus ʻipcluster` et tuez tous les moteurs et contrôleurs.

Dans le cas de la méthode 1, le travail en cours n'est pas annulé et seul le travail dans la file d'attente des travaux est annulé. Par conséquent, il ne peut pas être utilisé lorsqu'un travail qui prend beaucoup de temps est exécuté ou lorsque le travail est hors de contrôle.

La méthode 2 est une méthode d'arrêt forcé de la tâche en cours d'exécution en arrêtant le moteur et le contrôleur de l'environnement d'exécution du cluster. Plus précisément, vous appuierez sur les touches [Ctrl] + [c] de la console où ʻipcluster` est exécuté.

^C2014-12-14 17:16:40.729 [IPClusterStart] ERROR | IPython cluster: stopping
2014-12-14 17:16:40.730 [IPClusterStart] Stopping Engines...
2014-12-14 17:16:43.734 [IPClusterStart] Removing pid file: /home/okazaki/.ipython/profile_mezcal/pid/ipcluster.pid

Malheureusement, la version actuelle de l'interface LoadBalancedView ne fournit pas un moyen d'arrêter un travail en cours (voir Ressources: [[IPython-dev] Fwd: interrompre / abandonner les travaux parallèles](http: /). /mail.scipy.org/pipermail/ipython-dev/2014-March/013426.html)). En cas d'urgence, il est nécessaire de redémarrer ʻipcluster` lui-même comme dans la méthode 2.

en conclusion

Quand j'ai commencé le traitement parallèle distribué dans un environnement de cluster (vers 2005), j'ai utilisé GXP Grid & Cluster Shell. C'était. GXP a la capacité d'exécuter des flux de travail comme make, mais je l'ai utilisé exclusivement à des fins stupides. C'est un outil utile, mais il semble que le développement soit actuellement arrêté. GNU Parallal peut avoir été suffisant à cette fin.

Vers 2011, j'avais sérieusement envisagé d'utiliser Hadoop, mais il était difficile de gérer et d'exploiter Hadoop au niveau du laboratoire de l'université. De plus, afin de paralléliser un petit programme implémenté en Python avec stupidité, un travail supplémentaire pour Hadoop est nécessaire, donc j'ai senti que l'obstacle était élevé pour les étudiants. Cependant, j'ai trouvé le système de fichiers distribué (HDFS) très pratique. Par conséquent, j'ai commencé à utiliser GFarm comme système de fichiers distribué. Afin de traiter les données placées sur le système de fichiers distribué en tenant compte de la localité, nous avons implémenté notre propre système de file d'attente de travaux en utilisant Paramiko. L'épave du système de file d'attente des travaux est gfqsub.

Récemment, cependant, des expériences et des travaux d'analyse de données ont commencé à être effectués sur IPython notebook. Le bloc-notes IPython est très pratique, mais sa compatibilité avec les outils de ligne de commande n'est pas bonne, j'ai donc cherché une bibliothèque Python qui se rend compte de la stupidité. Quand je l'ai recherché, le cluster Python prenait en charge le traitement parallèle distribué, mais il y avait peu d'informations autres que la documentation officielle, j'ai donc créé Bakapara en lisant moi-même la documentation.

Cette fois, j'ai réalisé la stupidité en utilisant SSH, mais le cluster IPython semble être capable d'utiliser MPI, PBS (qsub), Windows HPC Server, Amazon EC2 de manière unifiée en définissant ʻipcluster_config.py`. Peut-être que le module Bakapara fonctionnera toujours dans ces environnements (je ne l'ai pas testé moi-même). De plus, le cluster IPython peut réaliser un traitement parallèle distribué assez avancé tel que l'échange de données de code entre le contrôleur et le moteur. Je voudrais continuer à explorer le genre de choses intéressantes qui peuvent être faites en utilisant ces fonctions.

Recommended Posts

Stupid (traitement parallèle distribué) par cluster IPython
Traitement distribué Python Spartan
Introduction au traitement parallèle distribué Python par Ray
Traitement parallèle avec multitraitement
Calcul parallèle avec le notebook iPython
Traitement parallèle avec des fonctions locales
Traitement parallèle Blender Modal Operator
Traitement parallèle avec Parallel de scikit-learn