Dumm (verteilte Parallelverarbeitung) durch IPython-Cluster

Single Program Multiple Data (SPMD) ist ein grundlegender und leistungsstarker Ansatz für die verteilte Parallelverarbeitung. Dies ist ein Modell, bei dem mehrere Prozessoren dasselbe Programm ausführen und jeder Prozessor zu diesem Zeitpunkt unterschiedliche Daten verarbeitet. Durch die Aufteilung großer Datenmengen in Einheiten, die unabhängig voneinander verarbeitet werden können, und die parallele Verarbeitung der unterteilten Daten durch mehrere Prozessoren kann die Verarbeitungszeit der gesamten Daten erheblich reduziert werden.

Führen Sie beispielsweise den Befehl "job" mit "a01.txt", "a02.txt", "a03.txt" und "a04.txt" als Eingabedateien aus und führen Sie die Ausführungsergebnisse (Ausgabe) als "b01.txt" aus Erwägen Sie das Speichern in , b02.txt, b03txt, b04.txt`. Der folgende Code implementiert diesen Prozess mit einem Bash-Shell-Skript.

#!/bin/bash
for src in $@
do
   job < $src > ${src/a/b}
done

Da die Verarbeitungsinhalte in dieser for-Schleife unabhängig voneinander sind, können sie leicht parallelisiert und die Gesamtverarbeitungszeit verkürzt werden.

bakapara.png

In der Verarbeitung natürlicher Sprache und beim maschinellen Lernen gibt es viele Prozesse, die durch Datenteilung leicht parallelisiert werden können, wie z. B. morphologische Analyse und Elementextraktion. Es scheint, dass eine solche (verteilte) Parallelverarbeitung (in Japan) als ** dumm ** bezeichnet wird. In diesem Artikel erfahren Sie, wie Sie Dummheit durch Befehlsausführung mit IPython-Cluster realisieren, einer auf Dummheit spezialisierten Bibliothek Bakapara. //github.com/chokkan/bakapara) wird eingeführt.

Einführung

Gemäß dem offiziellen Dokument Architekturübersicht besteht der IPython-Cluster aus den folgenden vier Elementen.

  1. ** Engine **: IPython-Interpreter, der parallel ausgeführten Python-Code ausführt. Es wird so oft gestartet, wie Sie parallel auf dem Host ausführen möchten, den Sie ausführen möchten. Jede Engine blockiert andere Vorgänge, während das Programm des Benutzers ausgeführt wird.
  2. ** Controller **: Schnittstelle zum Betrieb der Motorgruppe. Der Benutzer verwaltet den Betrieb des Motors durch Betätigen der Steuerung. Intern besteht es aus einem Hub und mehreren Schedulern.
  3. ** Hub **: Das Herzstück der Cluster-Ausführungsumgebung. Verwaltet zentral Verbindungen zur Engine, zum Scheduler, zu Clients, zu Ausführungsergebnissen usw.
  4. ** Scheduler **: Verwalten von Engine-Jobs.

Um eine parallele Ausführungsumgebung mit IPython zu erstellen, müssen ein Controller und mehrere Engines gestartet werden. Es gibt zwei Möglichkeiten, die Steuerung und den Motor zu starten.

  1. Starten Sie den Controller und die Engine automatisch mit dem Befehl ipcluster
  2. Starten Sie den Controller manuell mit dem Befehl "ipcontroller" und die Engine mit dem Befehl "ipengine". Dieses Mal möchte ich einfach eine Clusterumgebung mit dem Befehl ipcluster erstellen.

In diesem Artikel werden die folgenden Elemente als Umgebung der Servergruppe angenommen.

Um eine konkrete Erklärung zu geben, wird in diesem Artikel die folgende Serverkonfiguration als Beispiel erläutert.

IPython-Cluster-Einstellungen

Erstellen und bearbeiten Sie die Einstellungsdatei unter Bezugnahme auf das offizielle Dokument Verwenden von ipcluster im SSH-Modus. Machen. IPython-Cluster ist praktisch, um die Cluster-Ausführungsumgebung in Einheiten zu verwalten, die als ** Profile ** bezeichnet werden. Der Name des Profils ist beliebig, aber hier erstellen wir ein Profil mit dem Namen "mezcal" basierend auf dem Namen der Servergruppe der Engine.

$ ipython profile create --parallel --profile=mezcal
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_notebook_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_nbconvert_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipcontroller_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipengine_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipcluster_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/iplogger_config.py'

Ein Verzeichnis mit dem Namen "$ HOME / .ipython / profile_ {Profilname}" wird erstellt, und die IPython-Cluster-Konfigurationsdatei "ipcluster_config.py" wird erstellt. Öffnen Sie diese Datei mit einem Editor und stellen Sie die Ausführungs-Engine jedes Knotens so ein, dass sie über SSH gestartet wird. Der Einstellungsort wird mit Kommentaren angezeigt. Sei "c.IPClusterStart.engine_launcher_class" "SSH".

ipcluster_config.py


# The class for launching a set of Engines. Change this value to use various
# batch systems to launch your engines, such as PBS,SGE,MPI,etc. Each launcher
# class has its own set of configuration options, for making sure it will work
# in your environment.
#
# You can also write your own launcher, and specify it's absolute import path,
# as in 'mymodule.launcher.FTLEnginesLauncher`.
#
# IPython's bundled examples include:
#
#     Local : start engines locally as subprocesses [default]
#     MPI : use mpiexec to launch engines in an MPI environment
#     PBS : use PBS (qsub) to submit engines to a batch queue
#     SGE : use SGE (qsub) to submit engines to a batch queue
#     LSF : use LSF (bsub) to submit engines to a batch queue
#     SSH : use SSH to start the controller
#                 Note that SSH does *not* move the connection files
#                 around, so you will likely have to do this manually
#                 unless the machines are on a shared file system.
#     HTCondor : use HTCondor to submit engines to a batch queue
#     WindowsHPC : use Windows HPC
#
# If you are using one of IPython's builtin launchers, you can specify just the
# prefix, e.g:
#
#     c.IPClusterEngines.engine_launcher_class = 'SSH'
#
# or:
#
#     ipcluster start --engines=MPI
c.IPClusterStart.engine_launcher_class = 'SSH'

Der Unterschied zum zuvor festgelegten Satz "c.IPClusterStart.engine_launcher_class" ist nicht bekannt, aber der "c.IPClusterEngines.engine_launcher_class" ist auch auf "SSH" festgelegt. Geben Sie außerdem den Hostnamen und die Anzahl der Engines (Anzahl der parallelen Ausführungen) für die verteilte parallele Verarbeitung im Wörterbuchobjekt "c.SSHEngineSetLauncher.engines" an. Legen Sie den Hostnamen im Schlüssel des Wörterbuchobjekts engine und die Anzahl der Engines im Wert fest. Hier ist ein Einstellungsbeispiel zum Starten von 4 Ausführungs-Engines mit jeweils mezcal [[01-12]] .cl.ecei.tohoku.ac.jp und zum Ausführen von bis zu 48 parallelen Prozessen.

ipcluster_config.py


# The class for launching a set of Engines. Change this value to use various
# batch systems to launch your engines, such as PBS,SGE,MPI,etc. Each launcher
# class has its own set of configuration options, for making sure it will work
# in your environment.
#
# You can also write your own launcher, and specify it's absolute import path,
# as in 'mymodule.launcher.FTLEnginesLauncher`.
#
# IPython's bundled examples include:
#
#     Local : start engines locally as subprocesses [default]
#     MPI : use mpiexec to launch engines in an MPI environment
#     PBS : use PBS (qsub) to submit engines to a batch queue
#     SGE : use SGE (qsub) to submit engines to a batch queue
#     LSF : use LSF (bsub) to submit engines to a batch queue
#     SSH : use SSH to start the controller
#                 Note that SSH does *not* move the connection files
#                 around, so you will likely have to do this manually
#                 unless the machines are on a shared file system.
#     HTCondor : use HTCondor to submit engines to a batch queue
#     WindowsHPC : use Windows HPC
#
# If you are using one of IPython's builtin launchers, you can specify just the
# prefix, e.g:
#
#     c.IPClusterEngines.engine_launcher_class = 'SSH'
#
# or:
#
#     ipcluster start --engines=MPI
c.IPClusterEngines.engine_launcher_class = 'SSH'

c.SSHEngineSetLauncher.engines = {
    'mezcal01.cl.ecei.tohoku.ac.jp': 4,
    'mezcal02.cl.ecei.tohoku.ac.jp': 4,
    'mezcal03.cl.ecei.tohoku.ac.jp': 4,
    'mezcal04.cl.ecei.tohoku.ac.jp': 4,
    'mezcal05.cl.ecei.tohoku.ac.jp': 4,
    'mezcal06.cl.ecei.tohoku.ac.jp': 4,
    'mezcal07.cl.ecei.tohoku.ac.jp': 4,
    'mezcal08.cl.ecei.tohoku.ac.jp': 4,
    'mezcal09.cl.ecei.tohoku.ac.jp': 4,
    'mezcal10.cl.ecei.tohoku.ac.jp': 4,
    'mezcal11.cl.ecei.tohoku.ac.jp': 4,
    'mezcal12.cl.ecei.tohoku.ac.jp': 4,
}

Wenn sich der Server, auf dem das Terminal oder IPython-Notebook ausgeführt wird, vom Controller-Server unterscheidet, müssen Sie Verbindungen von anderen Servern zum Controller zulassen. Wenn sich die Server in einem vertrauenswürdigen LAN befinden, können alle Hosts bequem eine Verbindung zum Controller herstellen. Fügen Sie "" -ip = "*" zu den Startoptionen von "ipcontroller" hinzu (standardmäßig können nur lokale Hosts eine Verbindung herstellen).

ipcluster_config.py


#------------------------------------------------------------------------------
# LocalControllerLauncher configuration
#------------------------------------------------------------------------------

# Launch a controller as a regular external process.

# command-line args to pass to ipcontroller
c.LocalControllerLauncher.controller_args = ["--ip='*'", '--log-to-file', '--log-level=20']

Dieses Mal wird das Home-Verzeichnis von der Steuerung und dem Engine-Host gemeinsam genutzt. Fügen Sie daher die folgenden Einstellungen hinzu.

ipcluster_config.py


#------------------------------------------------------------------------------
# SSHLauncher configuration
#------------------------------------------------------------------------------

# A minimal launcher for ssh.
#
# To be useful this will probably have to be extended to use the ``sshx`` idea
# for environment variables.  There could be other things this needs as well.

# hostname on which to launch the program
# c.SSHLauncher.hostname = ''

# command for starting ssh
# c.SSHLauncher.ssh_cmd = ['ssh']

# user@hostname location for ssh in one setting
# c.SSHLauncher.location = ''

# List of (local, remote) files to send before starting
c.SSHLauncher.to_send = []

# command for sending files
# c.SSHLauncher.scp_cmd = ['scp']

# List of (remote, local) files to fetch after starting
c.SSHLauncher.to_fetch = []

# args to pass to ssh
# c.SSHLauncher.ssh_args = ['-tt']

# username for ssh
# c.SSHLauncher.user = ''

Um die Bedeutung dieser Einstellung zu verstehen, muss der Ablauf verstanden werden, bis der IPython-Cluster gestartet wird. Der Ablauf bis zum Start des IPython-Clusters wird angezeigt.

  1. Starten Sie die Steuerung
  2. Übertragen Sie die vom Controller erstellte "ipcontroller-engine.json" mit scp auf den Engine-Host.
  3. Starten Sie jeden Motor Wenn das Home-Verzeichnis freigegeben ist, ist Schritt 2 hier nicht erforderlich.

Cluster starten

Führen Sie den Befehl "ipcluster" auf dem Host aus, auf dem Sie den Controller ausführen möchten, und starten Sie den Controller und die Engine zusammen. Geben Sie zu diesem Zeitpunkt den Profilnamen mit der Option --profile an.

$ ipcluster start --profile=mezcal
2014-12-11 14:15:49.891 [IPClusterStart] Using existing profile dir: u'/home/okazaki/.ipython/profile_mezcal'
2014-12-11 14:15:50.023 [IPClusterStart] Starting ipcluster with [daemon=False]
2014-12-11 14:15:50.025 [IPClusterStart] Creating pid file: /home/okazaki/.ipython/profile_mezcal/pid/ipcluster.pid
2014-12-11 14:15:50.025 [IPClusterStart] Starting Controller with LocalControllerLauncher
2014-12-11 14:15:51.024 [IPClusterStart] Starting 48 Engines with SSH
2014-12-11 14:16:25.117 [IPClusterStart] Engines appear to have started successfully

Wenn "Motoren scheinen erfolgreich gestartet zu sein" angezeigt wird, ist dies erfolgreich. In der Meldung "Starten von 48 Engines mit SSH" können Sie bestätigen, dass $ 12 \ times 4 = 48 $ von Engines gestartet wurden.

Führen Sie das Programm auf dem Cluster aus

Importieren Sie das Modul "IPython.parallel".

In [1]: from IPython.parallel import Client

Erstellen Sie ein Client-Objekt, um die Steuerung und die Engine zu betreiben. Geben Sie den Profilnamen im Argument profile an.

In [2]: rc = Client(profile='mezcal')

Wenn Sie die ID der Engine überprüfen, mit der das Client-Objekt verbunden ist, können Sie bestätigen, dass es mit 48 Engines verbunden ist.

In [3]: rc.ids
Out[3]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47]

Das folgende Verfahren hat nichts mit Dummheit zu tun, aber ich werde es als allgemeine Verwendung des IPython-Clusters erklären. Verwenden Sie die DirectView-Instanz, um Code direkt auf jeder Engine auszuführen, ohne den Taskplaner zu durchlaufen.

In [4]: dview = rc[:]

Der folgende Code berechnet $ x ^ 2 $ für $ x \ in \ {0, 1, ..., 49 \} $ (ohne Parallelisierung).

In [5]: serial_result =  map(lambda x: x**2, range(50))

Lassen Sie uns diese Berechnung für jedes Element $ x \ in \ {0, 1, ..., 49 \} $ parallelisieren.

In [6]: parallel_result = dview.map_sync(lambda x: x**2, range(50))

Die von jeder Engine ausgeführten Ergebnisse werden aggregiert und in "parallel_result" gespeichert.

In [7]: parallel_result
Out[7]:
[0,
 1,
 4,
 9,
 ...
 2401]

Selbstverständlich ist das Berechnungsergebnis unabhängig vom Vorhandensein oder Fehlen einer Parallelisierung das gleiche.

In [8]: serial_result == parallel_result
Out[8]: True

Mit dem "Remote" -Dekorator können Sie eine Funktion (Remote-Funktion) definieren, die von jeder Engine ausgeführt werden soll. Die folgende Funktion "gethostname" ist eine Funktion, die den Hostnamen mit "socket.getfqdn ()" abruft und zurückgibt. Beachten Sie jedoch, dass das Modul "socket" in die Funktion importiert wird. Das Importieren eines Moduls auf der Clientseite bedeutet nicht, dass der IPython-Prozess auf der Engine das Modul importiert hat. Daher muss das Modul innerhalb der Funktion importiert werden.

In [9]: @dview.remote(block=True)
   ...: def gethostname():
   ...:     import socket
   ...:     return socket.getfqdn()
   ...:

Sie können den Hostnamen jeder Engine abrufen, indem Sie die Funktion "gethostname" auf dem Client aufrufen. Die Reihenfolge ist nicht in Ordnung, aber Sie können sehen, dass jede der vier Engines auf den Hosts von mezcal01 bis mezcal12 ausgeführt wird.

In [10]: gethostname()
Out[10]: 
['mezcal06.cl.ecei.tohoku.ac.jp',
 'mezcal06.cl.ecei.tohoku.ac.jp',
 'mezcal06.cl.ecei.tohoku.ac.jp',
 'mezcal06.cl.ecei.tohoku.ac.jp',
 'mezcal09.cl.ecei.tohoku.ac.jp',
 'mezcal09.cl.ecei.tohoku.ac.jp',
 'mezcal09.cl.ecei.tohoku.ac.jp',
 'mezcal09.cl.ecei.tohoku.ac.jp',
 'mezcal04.cl.ecei.tohoku.ac.jp',
 'mezcal04.cl.ecei.tohoku.ac.jp',
 'mezcal04.cl.ecei.tohoku.ac.jp',
 'mezcal04.cl.ecei.tohoku.ac.jp',
 ...
 'mezcal01.cl.ecei.tohoku.ac.jp',
 'mezcal03.cl.ecei.tohoku.ac.jp',
 'mezcal03.cl.ecei.tohoku.ac.jp',
 'mezcal03.cl.ecei.tohoku.ac.jp',
 'mezcal03.cl.ecei.tohoku.ac.jp']

Es gibt auch "parallele" Dekoratoren, die Funktionen definieren, die parallel ausgeführt werden. Weitere Informationen finden Sie unter IPython Direct-Schnittstelle.

Ausführung über Scheduler

Die LoadBalancedView-Instanz führt Jobs mithilfe der dynamischen Lastverteilung aus. Sie können nicht direkt auf die einzelnen Engines zugreifen, aber Sie können die Jobwarteschlangen in der Cluster-Engine implementieren, wie die Jobwarteschlangen in multiprocessing.Pool.

Lassen Sie uns als einfaches Beispiel den Befehl "sleep 10" für jede Engine ausführen. Erstellen Sie eine Liste mit den Jobs, die Sie ausführen möchten.

In [11]: jobs = [dict(cmd='sleep 10') for i in range(100)]

Jedes Element dieser Liste ist vom Wörterbuchtyp und speichert den auszuführenden Befehl im Wert des Schlüssels cmd. Dieses Mal wird "Schlaf 10" für alle Jobs ausgeführt, aber wenn tatsächlich Dummheit ausgeführt wird, sollte sich der Inhalt des Befehls entsprechend den Eingabedaten ändern. Die ersten 5 Jobs sehen so aus.

In [12]: jobs[:5]
Out[12]:
[{'cmd': 'sleep 10'},
 {'cmd': 'sleep 10'},
 {'cmd': 'sleep 10'},
 {'cmd': 'sleep 10'},
 {'cmd': 'sleep 10'}]

Implementieren Sie die Funktion "runjob", die den vom Wörterbuchobjekt dargestellten Job (Befehl) ausführt. Der Wert des "cmd" -Schlüssels des empfangenen Wörterbuchobjekts wird in der Shell ausgeführt, und der Rückgabewert wird im Wörterbuchobjekt gespeichert und zurückgegeben.

In [13]: def runjob(job):
   ....:     import subprocess
   ....:     try:
   ....:         returncode = subprocess.call(job['cmd'], shell=True)
   ....:         return dict(code=returncode)
   ....:     except OSError, e:
   ....:         return dict(error=str(e))
   ....:

Um diese "runjob" -Funktion nacheinander in der Jobwarteschlange auszuführen, rufen Sie vom Client eine "LoadBalancedView" -Instanz ab.

In [14]: lview = rc.load_balanced_view()

Dann wird die Funktion "runjob" für jedes Element der Liste "jobs" asynchron ausgeführt.

In [15]: ar = lview.map_async(runjob, jobs)

Die Ausführung dieses Codes wird nicht blockiert und ein AsyncResult-Objekt wird sofort zurückgegeben. Über dieses AsyncResult-Objekt können Sie das Ergebnis der Jobausführung und den Fortschrittsstatus überprüfen. Lassen Sie uns beispielsweise den Jobausführungsstatus in der interaktiven Shell anzeigen (auch im IPython-Notizbuch verfügbar).

In [16]: ar.wait_interactive()
  48/100 tasks finished after   15 s

Wenn diese Funktion "wait_interactive" aufgerufen wird, wird der Jobausführungsstatus jede Sekunde in der Shell angezeigt. Die obige Anzeige zeigt, dass 48 von 100 Jobs 15 Sekunden nach Beginn der Jobausführung abgeschlossen wurden. Da die für einen Job erforderliche Zeit 10 Sekunden beträgt und 48 Engines gleichzeitig verwendet werden, werden 48 Jobs in 10 Sekunden nach Beginn der Ausführung und weitere 48 Jobs zwischen 10 Sekunden und 20 Sekunden abgeschlossen. Wird durchgeführt. Wenn alle Jobs abgeschlossen sind, wird Folgendes angezeigt.

In [16]: ar.wait_interactive()
 100/100 tasks finished after   30 s
done

Jeder Job wird asynchron ausgeführt, aber die Funktion "wait_interactive" wird erst beendet, wenn alle Jobs abgeschlossen sind. Wenn Sie den Fortschritt des Jobs nicht mehr anzeigen möchten, können Sie ihn mit [Strg] + [c] unterbrechen ("Kernel" - "Interrupt" für IPython-Notebook). Selbst wenn die Anzeige unterbrochen wird, wird die Jobausführung fortgesetzt.

Sie können das Ergebnis der Jobausführung über das Objekt "AsyncResult" überprüfen. Lassen Sie uns die Ausführungsergebnisse der ersten 5 Jobs überprüfen.

In [17]: ar[:5]
Out[17]: [{'code': 0}, {'code': 0}, {'code': 0}, {'code': 0}, {'code': 0}]

Wenn code`` 0 ist, bedeutet dies, dass der Rückgabewert der Shell ( / bin / sh), die den Befehl sleep ausgeführt hat, 0 war.

Bakapara-Modul

Bakapara ist eine Modularisierung des Prozesses zum Ausführen einer Jobwarteschlange durch einen Befehl in der Shell des IPython-Clusters. Es wurde für die Verwendung mit der interaktiven Shell und dem IPython-Notebook von Python entwickelt. Es kann auch unabhängig von der Kommandozeile bedient werden.

Installation

Wenn Sie es als Modul verwenden möchten, laden Sie bakapara.py herunter und legen Sie es in dem Verzeichnis ab, in dem PYTHONPATH passiert. .. Bei der Ausführung über die Befehlszeile ist es zweckmäßig, sie in einem Verzeichnis zu platzieren, das sich im Pfad befindet.

Anforderungen der Arbeitsstelle

Das Bakapara-Objekt empfängt eine Liste von Jobs und führt sie auf der Cluster-Engine aus. Jeder Job kann in einem beliebigen Format vorliegen, sofern es sich um ein Wörterbuchobjekt handelt, das die folgenden Spezifikationen erfüllt.

Schlüssel Wert Beispiel
cmd Der Befehl, den Sie auf der Engine ausführen möchten. Da der Befehl tatsächlich über die Shell ausgeführt wird, können auch Pipes und Redirects verwendet werden. wc -l /work/001.txt
cd (Optional) Arbeitsverzeichnis, wenn die Engine Befehle ausführt. Wenn dieser Schlüssel nicht vorhanden ist, wird das Arbeitsverzeichnis der Engine nicht geändert. Das Arbeitsverzeichnis jeder Engine wird beim Erstellen des Bakapara-Objekts mit dem Arbeitsverzeichnis initialisiert. /home/okazaki/projects/bakapara
out (Optional) Wenn Sie einen Dateinamen für diesen Wert angeben, wird die Standardausgabe beim Ausführen des Befehls in der Datei gespeichert. Wenn dieser Schlüssel nicht vorhanden ist, wird der Inhalt der Standardausgabe nicht gespeichert. /work/001.txt.out
err (Optional) Wenn Sie einen Dateinamen für diesen Wert angeben, wird die Standardfehlerausgabe beim Ausführen des Befehls in der Datei gespeichert. Wenn dieser Schlüssel nicht vorhanden ist, wird der Inhalt der Standardfehlerausgabe nicht gespeichert. /work/001.txt.err
host (Optional) Eine Liste von Hosts (Engines), die Befehle ausführen können. Wenn dieser Schlüssel nicht vorhanden ist, wird der Job auf allen Engines als ausführbar angesehen. ['mezcal03.cl.ecei.tohoku.ac.jp',]

host wird verwendet, wenn Sie einen bestimmten Job auf einem bestimmten Host ausführen möchten. Wenn die zu verarbeitenden Daten beispielsweise auf der lokalen Festplatte jedes Servers verteilt sind, können Sie den Host angeben, auf dem sich die zur Ausführung des Jobs erforderlichen Daten mit "host" befinden. Darüber hinaus können Sie mit dem verteilten Dateisystem GFarm überprüfen, auf welchem Host sich jede Datei im verteilten Dateisystem befindet Durch Angabe des Hosts, auf dem sich die Daten befinden, kann eine verteilte Parallelverarbeitung unter Berücksichtigung der Datenlokalität wie HDFS + Hadoop realisiert werden.

Das Ausführungsergebnis jedes Jobs wird als Wert des Ergebnisschlüssels des Wörterbuchobjekts gespeichert (überschrieben). Der Wert von "result" ist ein Wörterbuchobjekt mit den folgenden Spezifikationen.

Schlüssel Wert Beispiel
code Code beenden 0
submitted Datum und Uhrzeit der Auftragserteilung durch den Kunden '2014-12-13T01:17:05.593718'
started Datum und Uhrzeit des Starts des Jobs am Motor '2014-12-13T01:17:04.559970'
completed Datum und Uhrzeit des Abschlusses der Auftragsausführung an der Engine '2014-12-13T01:17:14.566251'
received Datum und Uhrzeit des Empfangs des Auftragsausführungsergebnisses durch den Client '2014-12-13T01:17:15.614301'
elapsed Zeitaufwand für die Ausführung des Jobs (completed-started '0:00:10.006281'
engine_id ID (Index) der Engine, die den Job ausgeführt hat 3
host Hostname der Engine, die den Job ausgeführt hat 'mezcal06.cl.ecei.tohoku.ac.jp'
pyerr Python-Ausnahmen (falls vorhanden). Wenn Sie den Hostnamen angeben, um den Job auszuführenUnmetDependencyEine Ausnahme wird möglicherweise angezeigt, dies ist jedoch normal. None
pyout Python-Interpreter-Ausgabe (falls vorhanden) None
status 'ok'Oder'error' 'ok'
msg_id UUID von Nachrichten, die zwischen Client und Engine ausgetauscht werden u'48fbc58b-ef73-4815-9f32-1932c01421ad'
error (Besteht nur, wenn ein schwerwiegender Fehler auftritt.) Fehlermeldung ''

Beachten Sie, dass es Fälle gibt, in denen die Befehlsausführung fehlschlägt, auch wenn "status" "ok" ist. Wenn Sie beispielsweise beim Schreiben des Befehls "cmd" einen Fehler machen und diesen nicht ausführen können, lautet "status" "ok", aber die Standardfehlerausgabe lautet "/ bin / bash: 1: hoge: not found". Eine Nachricht wie \ n'` bleibt erhalten. Der Jobbefehl wird über "/ bin / bash -o Pipe Fail" ausgeführt. Wenn einer der weitergeleiteten Befehle einen anderen Statuscode als "0" zurückgibt, wird der Rückgabewert daher in "Code" gespeichert. Daher ist es wichtig, den Rückgabewert "Code" zu überprüfen.

Verwendung in der interaktiven Python-Shell

Importieren Sie zuerst das Bakapara-Modul und erstellen Sie eine Bakapara-Instanz. Die Spezifikation des Konstruktors der Bakapara-Klasse lautet [IPython.parallel.Client](http://ipython.org/ipython-doc/dev/api/generated/IPython.parallel.client.client.html#IPython.parallel.client] .client.Client) ist das gleiche. Normalerweise geben Sie den Profilnamen wie folgt an.

In [1]: from bakapara import Bakapara

In [2]: bp = Bakapara(profile='mezcal')

Erstellen Sie eine Liste der Jobs, die Sie ausführen möchten. Der folgende Code erstellt einen Job, der den Befehl sleep 10 100 Mal ausführt.

In [3]: jobs = [dict(cmd='sleep 10') for i in range(100)]

Lassen Sie uns zur Erklärung den ersten Job überprüfen.

In [4]: jobs[0]
Out[4]: {'cmd': 'sleep 10'}

Verwenden Sie die run -Methode, um den Job zu starten. Der Rückgabewert "True" zeigt an, dass die Jobregistrierung erfolgreich war.

In [5]: bp.run(jobs)
Out[5]: True

Sie können den Fortschritt des Jobs mit der Methode "status" überprüfen. Der Fortschritt des Auftrags wird jede Sekunde angezeigt.

In [6]: bp.status()
48/100 tasks finished after 0:00:12.554905

Diese "Status" -Methode wird erst ausgeführt, wenn alle Jobs abgeschlossen sind. Wenn Sie etwas anderes tun möchten, drücken Sie [Strg] + [c]("Kernel" - "Interrupt" im IPython-Notizbuch), um zu unterbrechen.

Wenn die Jobausführung abgeschlossen ist, wird Folgendes angezeigt.

In [7]: bp.status()
100/100 tasks finished after 0:00:30.137117
100 tasks completed in 0:00:30.137117

Wenn der Job abgeschlossen ist, wird das Ergebnis in die Jobliste geschrieben (die Jobliste, die der Methode "run" der Klasse "Bakapara" zugewiesen wurde). Um genau zu sein, wird das Ausführungsergebnis in die Liste "Jobs" geschrieben, indem nach Abschluss des Jobs die Methode "status" oder die Methode "wait" aufgerufen wird. Selbst wenn der gesamte Job nicht abgeschlossen ist, werden die Ergebnisse bis zu dem Zeitpunkt, an dem die Methode "status" oder "wait" aufgerufen wird, in die Jobliste geschrieben. Um das Ergebnis der Jobausführung zu erhalten, rufen Sie die Methode "status" auf und überprüfen Sie, ob die Meldung "xx / xxx Aufgaben beendet" angezeigt wird.

Lassen Sie uns das Ausführungsergebnis überprüfen. Sie können auf Informationen wie den Hostnamen, der den Job ausgeführt hat, die erforderliche Zeit und den Endcode zugreifen.

In [8]: jobs[0]
Out[8]:
{'cmd': 'sleep 10',
 'result': {'code': 0,
  'completed': '2014-12-13T12:26:12.665525',
  'elapsed': '0:00:10.005647',
  'engine_id': 11,
  'host': 'mezcal11.cl.ecei.tohoku.ac.jp',
  'msg_id': u'72666439-9364-4a37-9dfb-8a04921d9a0c',
  'pyerr': None,
  'pyout': None,
  'received': '2014-12-13T12:24:38.638917',
  'started': '2014-12-13T12:26:02.659878',
  'status': u'ok',
  'submitted': '2014-12-13T12:23:58.320781'}}

Verwenden Sie über die Befehlszeile

Wie wir gesehen haben, ist die Verwendung des "Bakapara" -Moduls einfach. Obwohl ich Python überhaupt nicht verstehe, denke ich, dass es Leute gibt, die dumme verteilte Parallelverarbeitung durchführen wollen. Wir haben auch eine Befehlszeilenschnittstelle für solche Personen vorbereitet. Wenn bakapara.py alleine ausgeführt wird, wird der JSON des Jobs aus der Standardeingabe gelesen und der JSON des Jobausführungsergebnisses in die Standardausgabe geschrieben. Der Eingabe- und Ausgabe-JSON ist ein Job pro Zeile, und das Format wird übernommen, in dem das Wörterbuchobjekt des Jobs in JSON in jeder Zeile beschrieben wird. Der Grund, warum der gesamte Job nicht im Listenformat gespeichert wird, besteht darin, dass das Ausführungsergebnis ausgeschrieben werden kann, sobald der Job abgeschlossen ist.

Die JSON-Datei eines Jobs, der 100 "Schlaf-10" ausführt, lautet beispielsweise wie folgt.

$ head -n5 sleep.task.json
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}

Führen Sie den folgenden Befehl aus, um diesen Job auszuführen und das Ausführungsergebnis in "sleep.result.json" zu schreiben. Der Profilname des IPython-Clusters wird mit der Option -p angegeben.

$ python bakapara.py -p mezcal < sleep.task.json > sleep.result.json
2014-12-21 10:39:49,231 total 100 jobs on 48 engines
2014-12-21 10:39:59,288 [1/100] returned 0 in 0:00:10.007444 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [2/100] returned 0 in 0:00:10.005645 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [3/100] returned 0 in 0:00:10.005994 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [4/100] returned 0 in 0:00:10.006593 sec on mezcal01.cl.ecei.tohoku.ac.jp: sleep 10
...
2014-12-21 10:40:19,282 [97/100] returned 0 in 0:00:10.005299 sec on mezcal08.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [98/100] returned 0 in 0:00:10.005097 sec on mezcal08.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [99/100] returned 0 in 0:00:10.005758 sec on mezcal02.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [100/100] returned 0 in 0:00:10.004995 sec on mezcal02.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 completed

Von der Standardausgabe wird der ausgeführte Job als Wörterbuchobjekt (in JSON codiert) zurückgegeben. Sie können sehen, dass das Ausführungsergebnis im Schlüssel result gespeichert ist.

$ head -n5 sleep.result.json
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.262566", "code": 0, "engine_id": 9, "started": "2014-12-21T10:40:46.649199", "completed": "2014-12-21T10:40:56.656643", "msg_id": "22d664c5-793a-44f1-b29d-c74f2aa434c1", "submitted": "2014-12-21T10:39:49.235879", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.007444", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.262205", "code": 0, "engine_id": 11, "started": "2014-12-21T10:40:46.650998", "completed": "2014-12-21T10:40:56.656643", "msg_id": "e8eb5db2-ac9b-481b-b0a4-fdb2ef15be62", "submitted": "2014-12-21T10:39:49.236327", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.005645", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.287773", "code": 0, "engine_id": 8, "started": "2014-12-21T10:40:46.679033", "completed": "2014-12-21T10:40:56.685027", "msg_id": "8a7e6fe0-482a-4ae0-a2ff-8321849aa8b0", "submitted": "2014-12-21T10:39:49.244347", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.005994", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.284039", "code": 0, "engine_id": 46, "started": "2014-12-21T10:40:46.698136", "completed": "2014-12-21T10:40:56.704729", "msg_id": "f03f9b93-4a60-494b-9a21-625cdcac252e", "submitted": "2014-12-21T10:39:49.242042", "pyerr": null, "host": "mezcal01.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.006593", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.259553", "code": 0, "engine_id": 28, "started": "2014-12-21T10:40:46.889807", "completed": "2014-12-21T10:40:56.895995", "msg_id": "bc9e7b74-64ba-45f4-ac0e-31b27db5d862", "submitted": "2014-12-21T10:39:49.234939", "pyerr": null, "host": "mezcal07.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.006188", "pyout": null}}

Über Jobunterbrechung

Wenn Sie sich die Ausführungsergebnisse der im Cluster ausgeführten Jobs ansehen, stellen Sie möglicherweise fest, dass der Inhalt des Jobs falsch ist oder dass der Job außer Kontrolle gerät. In diesem Fall wird die Jobausführung abgebrochen. Es gibt zwei Möglichkeiten, einen Joblauf in Bakapara abzubrechen.

  1. Rufen Sie die Methode abort () auf.
  2. Stoppen Sie den "ipcluster" -Prozess und töten Sie alle Engines und Controller.

Bei Methode 1 wird der laufende Job nicht abgebrochen und nur der Job in der Jobwarteschlange wird abgebrochen. Daher kann es nicht verwendet werden, wenn ein Job ausgeführt wird, der sehr lange dauert, oder wenn der Job außer Kontrolle gerät.

Methode 2 ist eine Methode zum gewaltsamen Beenden des laufenden Jobs durch Beenden der Engine und des Controllers der Clusterausführungsumgebung. Insbesondere drücken Sie die Tasten [Strg] + [c] auf der Konsole, auf der "ipcluster" ausgeführt wird.

^C2014-12-14 17:16:40.729 [IPClusterStart] ERROR | IPython cluster: stopping
2014-12-14 17:16:40.730 [IPClusterStart] Stopping Engines...
2014-12-14 17:16:43.734 [IPClusterStart] Removing pid file: /home/okazaki/.ipython/profile_mezcal/pid/ipcluster.pid

Leider bietet die aktuelle Version der LoadBalancedView-Schnittstelle keine Möglichkeit, einen laufenden Job zu stoppen (siehe Ressourcen: [[IPython-dev] Fwd: Parallele Jobs unterbrechen / abbrechen](http: /). /mail.scipy.org/pipermail/ipython-dev/2014-March/013426.html)). Im Notfall muss der "ipcluster" selbst wie in Methode 2 neu gestartet werden.

abschließend

Als ich (um 2005) mit der verteilten Parallelverarbeitung in einer Clusterumgebung begann, verwendete ich GXP Grid & Cluster Shell. Es war. GXP hat die Fähigkeit, Workflows wie make auszuführen, aber ich habe es ausschließlich für dumme Zwecke verwendet. Es ist ein nützliches Werkzeug, aber es scheint, dass die Entwicklung derzeit gestoppt ist. GNU Parallal war möglicherweise für diesen Zweck ausreichend.

Um 2011 hatte ich ernsthaft überlegt, Hadoop zu verwenden, aber es war schwierig, Hadoop auf Laborebene der Universität zu verwalten und zu betreiben. Um ein kleines Programm, das in Python implementiert ist, mit Dummheit zu parallelisieren, ist zusätzliche Arbeit für Hadoop erforderlich, sodass ich der Meinung war, dass die Hürde für die Schüler hoch war. Ich fand das verteilte Dateisystem (HDFS) jedoch sehr praktisch. Daher habe ich begonnen, GFarm als verteiltes Dateisystem zu verwenden. Um die auf dem verteilten Dateisystem platzierten Daten unter Berücksichtigung der Lokalität zu verarbeiten, haben wir mit Paramiko ein eigenes Jobwarteschlangensystem implementiert. Das Wrack des Jobwarteschlangensystems ist gfqsub.

In jüngster Zeit wurden jedoch Experimente und Datenanalysen für IPython-Notizbuch durchgeführt. IPython-Notebook ist sehr praktisch, aber seine Kompatibilität mit Befehlszeilentools ist nicht gut. Deshalb habe ich nach einer Python-Bibliothek gesucht, die Dummheit erkennt. Als ich nachgeschlagen habe, hat der Python-Cluster die verteilte Parallelverarbeitung unterstützt, aber es gab nur wenige Informationen außer der offiziellen Dokumentation. Deshalb habe ich Bakapara erstellt, während ich die Dokumentation selbst gelesen habe.

Dieses Mal wurde mir mit SSH Dummheit klar, aber IPython Cluster scheint in der Lage zu sein, MPI, PBS (qsub), Windows HPC Server und Amazon EC2 auf einheitliche Weise zu verwenden, indem ich "ipcluster_config.py" einstelle. Vielleicht funktioniert das Bakapara-Modul in diesen Umgebungen noch (ich habe es selbst nicht getestet). Darüber hinaus kann der IPython-Cluster eine ziemlich fortgeschrittene verteilte Parallelverarbeitung realisieren, beispielsweise den Austausch von Codedaten zwischen dem Controller und der Engine. Ich möchte weiter untersuchen, welche interessanten Dinge mit diesen Funktionen möglich sind.

Recommended Posts

Dumm (verteilte Parallelverarbeitung) durch IPython-Cluster
Python verteilte Verarbeitung Spartan
Einführung in die verteilte Parallelverarbeitung von Python durch Ray
Parallelverarbeitung mit Mehrfachverarbeitung
Parallele Berechnung mit iPython Notebook
Parallelverarbeitung mit lokalen Funktionen
Blender Modal Operator Parallelverarbeitung
Parallele Verarbeitung mit Parallel von Scikit-Learn