IPython cluster stupid (distributed parallel processing)

A basic and powerful approach to distributed parallel processing is Single Program Multiple Data (SPMD). This is a model in which multiple processors execute the same program, and each processor processes different data. By dividing large-scale data into units that can be processed independently and processing the subdivided data in parallel by multiple processors, the processing time of the entire data can be significantly reduced.

For example, execute the command job with ʻa01.txt, ʻa02.txt, ʻa03.txt, and ʻa04.txt as input files, and execute the execution results (output) asb01.txt, respectively. Consider storing in, b02.txt, b03txt, b04.txt. The following code implements this process with a bash shell script.

#!/bin/bash
for src in $@
do
   job < $src > ${src/a/b}
done

Since the processing contents in this for loop are independent of each other, they can be easily parallelized and the overall processing time can be shortened.

bakapara.png

In natural language processing and machine learning, there are many processes that can be easily parallelized by data division, such as morphological analysis and feature extraction. It seems that such (distributed) parallel processing is called ** stupid ** (in Japan). In this article, how to realize stupidity by command execution with IPython cluster, library specializing in stupidity Bakapara Introducing (: //github.com/chokkan/bakapara).

Introduction

According to the official documentation Architecture overview, the IPython cluster consists of the following four elements:

  1. ** Engine **: An IPython interpreter that runs Python code that runs in parallel. It will be started as many times as you want to execute in parallel on the host you want to execute. Each engine blocks other operations while the user's program is running.
  2. ** Controller **: An interface for operating the engine group. The user manages the operation of the engine by operating the controller. Internally, it consists of one hub and multiple schedulers.
  3. ** Hub **: The heart of the cluster execution environment. Centrally manages the connection to the engine, scheduler, client, execution result, etc.
  4. ** Scheduler **: Manage engine jobs.

To build a parallel execution environment with IPython, you need to start one controller and multiple engines. There are two ways to start the controller and engine.

  1. Automatically start the controller and engine with the ʻipcluster` command
  2. Manually start the controller with the ʻipcontroller command and the engine with the ʻipengine command. This time, I would like to easily build a cluster environment using the ʻipcluster` command.

In this article, the following items are assumed as the environment of the server group.

In order to give a concrete explanation, this article uses the following server configuration as an example.

IPython cluster settings

Create and edit the configuration file by referring to the official document Using ipcluster in SSH mode To do. IPython cluster is convenient to manage the cluster execution environment in units called ** profiles **. The name of the profile is arbitrary, but here we will create a profile named "mezcal" based on the name of the server group of the engine.

$ ipython profile create --parallel --profile=mezcal
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_notebook_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_nbconvert_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipcontroller_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipengine_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipcluster_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/iplogger_config.py'

A directory called $ HOME / .ipython / profile_ {profile name} is created, and the IPython cluster configuration file ʻipcluster_config.pyis created. Open this file with an editor and set the execution engine of each node to start via SSH. The setting location is shown with comments left. Letc.IPClusterStart.engine_launcher_class be'SSH'`.

ipcluster_config.py


# The class for launching a set of Engines. Change this value to use various
# batch systems to launch your engines, such as PBS,SGE,MPI,etc. Each launcher
# class has its own set of configuration options, for making sure it will work
# in your environment.
#
# You can also write your own launcher, and specify it's absolute import path,
# as in 'mymodule.launcher.FTLEnginesLauncher`.
#
# IPython's bundled examples include:
#
#     Local : start engines locally as subprocesses [default]
#     MPI : use mpiexec to launch engines in an MPI environment
#     PBS : use PBS (qsub) to submit engines to a batch queue
#     SGE : use SGE (qsub) to submit engines to a batch queue
#     LSF : use LSF (bsub) to submit engines to a batch queue
#     SSH : use SSH to start the controller
#                 Note that SSH does *not* move the connection files
#                 around, so you will likely have to do this manually
#                 unless the machines are on a shared file system.
#     HTCondor : use HTCondor to submit engines to a batch queue
#     WindowsHPC : use Windows HPC
#
# If you are using one of IPython's builtin launchers, you can specify just the
# prefix, e.g:
#
#     c.IPClusterEngines.engine_launcher_class = 'SSH'
#
# or:
#
#     ipcluster start --engines=MPI
c.IPClusterStart.engine_launcher_class = 'SSH'

The difference from the c.IPClusterStart.engine_launcher_class set earlier is unknown, but the c.IPClusterEngines.engine_launcher_class is also set to 'SSH'. In addition, specify the host name and the number of engines (the number of parallel executions) to perform distributed parallel processing in the dictionary object c.SSHEngineSetLauncher.engines. ʻEnginesSet the host name in the key of the dictionary object and the number of engines in the value. Here is a setting example for starting 4 execution engines each withmezcal [[01-12]] .cl.ecei.tohoku.ac.jp` and performing up to 48 parallel processing.

ipcluster_config.py


# The class for launching a set of Engines. Change this value to use various
# batch systems to launch your engines, such as PBS,SGE,MPI,etc. Each launcher
# class has its own set of configuration options, for making sure it will work
# in your environment.
#
# You can also write your own launcher, and specify it's absolute import path,
# as in 'mymodule.launcher.FTLEnginesLauncher`.
#
# IPython's bundled examples include:
#
#     Local : start engines locally as subprocesses [default]
#     MPI : use mpiexec to launch engines in an MPI environment
#     PBS : use PBS (qsub) to submit engines to a batch queue
#     SGE : use SGE (qsub) to submit engines to a batch queue
#     LSF : use LSF (bsub) to submit engines to a batch queue
#     SSH : use SSH to start the controller
#                 Note that SSH does *not* move the connection files
#                 around, so you will likely have to do this manually
#                 unless the machines are on a shared file system.
#     HTCondor : use HTCondor to submit engines to a batch queue
#     WindowsHPC : use Windows HPC
#
# If you are using one of IPython's builtin launchers, you can specify just the
# prefix, e.g:
#
#     c.IPClusterEngines.engine_launcher_class = 'SSH'
#
# or:
#
#     ipcluster start --engines=MPI
c.IPClusterEngines.engine_launcher_class = 'SSH'

c.SSHEngineSetLauncher.engines = {
    'mezcal01.cl.ecei.tohoku.ac.jp': 4,
    'mezcal02.cl.ecei.tohoku.ac.jp': 4,
    'mezcal03.cl.ecei.tohoku.ac.jp': 4,
    'mezcal04.cl.ecei.tohoku.ac.jp': 4,
    'mezcal05.cl.ecei.tohoku.ac.jp': 4,
    'mezcal06.cl.ecei.tohoku.ac.jp': 4,
    'mezcal07.cl.ecei.tohoku.ac.jp': 4,
    'mezcal08.cl.ecei.tohoku.ac.jp': 4,
    'mezcal09.cl.ecei.tohoku.ac.jp': 4,
    'mezcal10.cl.ecei.tohoku.ac.jp': 4,
    'mezcal11.cl.ecei.tohoku.ac.jp': 4,
    'mezcal12.cl.ecei.tohoku.ac.jp': 4,
}

If the server that runs the terminal or IPython notebook is different from the controller server, you need to allow connections from other servers to the controller. If the servers are located on a trusted LAN, it is convenient to allow all hosts to connect to the controller. ʻAdd"--ip ='*'"to the startup options of ipcontroller` (default is that only localhost can be connected).

ipcluster_config.py


#------------------------------------------------------------------------------
# LocalControllerLauncher configuration
#------------------------------------------------------------------------------

# Launch a controller as a regular external process.

# command-line args to pass to ipcontroller
c.LocalControllerLauncher.controller_args = ["--ip='*'", '--log-to-file', '--log-level=20']

This time, the home directory is shared between the controller and the engine host, so add the following settings.

ipcluster_config.py


#------------------------------------------------------------------------------
# SSHLauncher configuration
#------------------------------------------------------------------------------

# A minimal launcher for ssh.
#
# To be useful this will probably have to be extended to use the ``sshx`` idea
# for environment variables.  There could be other things this needs as well.

# hostname on which to launch the program
# c.SSHLauncher.hostname = ''

# command for starting ssh
# c.SSHLauncher.ssh_cmd = ['ssh']

# user@hostname location for ssh in one setting
# c.SSHLauncher.location = ''

# List of (local, remote) files to send before starting
c.SSHLauncher.to_send = []

# command for sending files
# c.SSHLauncher.scp_cmd = ['scp']

# List of (remote, local) files to fetch after starting
c.SSHLauncher.to_fetch = []

# args to pass to ssh
# c.SSHLauncher.ssh_args = ['-tt']

# username for ssh
# c.SSHLauncher.user = ''

To understand the meaning of this setting, you need to understand the process of starting the IPython cluster. The flow until IPython cluster starts is shown.

  1. Start the controller
  2. Transfer the controller-created ʻipcontroller-engine.json` to the engine host with scp
  3. Start each engine Here, if the home directory is shared, step 2 is unnecessary.

Start cluster

Execute the ʻipcluster` command on the host where you want to run the controller, and start the controller and engine together. At this time, specify the profile name with the --profile option.

$ ipcluster start --profile=mezcal
2014-12-11 14:15:49.891 [IPClusterStart] Using existing profile dir: u'/home/okazaki/.ipython/profile_mezcal'
2014-12-11 14:15:50.023 [IPClusterStart] Starting ipcluster with [daemon=False]
2014-12-11 14:15:50.025 [IPClusterStart] Creating pid file: /home/okazaki/.ipython/profile_mezcal/pid/ipcluster.pid
2014-12-11 14:15:50.025 [IPClusterStart] Starting Controller with LocalControllerLauncher
2014-12-11 14:15:51.024 [IPClusterStart] Starting 48 Engines with SSH
2014-12-11 14:16:25.117 [IPClusterStart] Engines appear to have started successfully

If "Engines appear to have started successfully" is displayed, it is successful. From the message "Starting 48 Engines with SSH", you can confirm that $ 12 \ times 4 = 48 $ of engines have been started.

Run the program on the cluster

ʻImport the IPython.parallel` module.

In [1]: from IPython.parallel import Client

Create a Client object to operate the controller and engine. Specify the profile name in the profile argument.

In [2]: rc = Client(profile='mezcal')

If you check the ID of the engine to which the Client object is connected, you can confirm that it is connected to 48 engines.

In [3]: rc.ids
Out[3]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47]

The following procedure has nothing to do with stupidity, but I will explain it as a general usage of IPython cluster. Use the DirectView instance to execute code directly on each engine without going through the task scheduler.

In [4]: dview = rc[:]

The following code calculates $ x ^ 2 $ for $ x \ in \ {0, 1, ..., 49 \} $ (without parallelization).

In [5]: serial_result =  map(lambda x: x**2, range(50))

Let's parallelize this calculation for each element $ x \ in \ {0, 1, ..., 49 \} $.

In [6]: parallel_result = dview.map_sync(lambda x: x**2, range(50))

The results executed by each engine are aggregated and stored in parallel_result.

In [7]: parallel_result
Out[7]:
[0,
 1,
 4,
 9,
 ...
 2401]

As a matter of course, the calculation result is the same regardless of the presence or absence of parallelization.

In [8]: serial_result == parallel_result
Out[8]: True

You can use the remote decorator to define a function (remote function) to be executed by each engine. The following gethostname function is a function that gets and returns the host name withsocket.getfqdn (), but please note that the socket module is imported in the function. Importing a module on the client side does not mean that the IPython process on the engine has imported the module, so you will need to import the module inside the function.

In [9]: @dview.remote(block=True)
   ...: def gethostname():
   ...:     import socket
   ...:     return socket.getfqdn()
   ...:

You can get the host name of each engine by calling the gethostname function on the client. The order is out of order, but you can see that each of the four engines is running on the hosts from mezcal01 to mezcal12.

In [10]: gethostname()
Out[10]: 
['mezcal06.cl.ecei.tohoku.ac.jp',
 'mezcal06.cl.ecei.tohoku.ac.jp',
 'mezcal06.cl.ecei.tohoku.ac.jp',
 'mezcal06.cl.ecei.tohoku.ac.jp',
 'mezcal09.cl.ecei.tohoku.ac.jp',
 'mezcal09.cl.ecei.tohoku.ac.jp',
 'mezcal09.cl.ecei.tohoku.ac.jp',
 'mezcal09.cl.ecei.tohoku.ac.jp',
 'mezcal04.cl.ecei.tohoku.ac.jp',
 'mezcal04.cl.ecei.tohoku.ac.jp',
 'mezcal04.cl.ecei.tohoku.ac.jp',
 'mezcal04.cl.ecei.tohoku.ac.jp',
 ...
 'mezcal01.cl.ecei.tohoku.ac.jp',
 'mezcal03.cl.ecei.tohoku.ac.jp',
 'mezcal03.cl.ecei.tohoku.ac.jp',
 'mezcal03.cl.ecei.tohoku.ac.jp',
 'mezcal03.cl.ecei.tohoku.ac.jp']

There are also parallel decorators that define functions that run in parallel. For more information, see IPython's Direct interface.

Execution via scheduler

The LoadBalancedView instance executes the job using dynamic load distribution. You will not be able to access individual engines directly, but you can implement job queues on cluster engines, like the job queues in multiprocessing.Pool.

As a simple example, let's run the command sleep 10 on each engine. Create a list containing the jobs you want to execute.

In [11]: jobs = [dict(cmd='sleep 10') for i in range(100)]

Each element of this list is a dictionary type and stores the command you want to execute in the value of the cmd key. This time, sleep 10 is executed for all jobs, but when actually doing stupidity, the contents of the command should change according to the input data. The first 5 jobs look like this.

In [12]: jobs[:5]
Out[12]:
[{'cmd': 'sleep 10'},
 {'cmd': 'sleep 10'},
 {'cmd': 'sleep 10'},
 {'cmd': 'sleep 10'},
 {'cmd': 'sleep 10'}]

Implement the function runjob that executes the job (command) represented by the dictionary object. The value of the cmd key of the received dictionary object is executed on the shell, and the return value is stored in the dictionary object and returned.

In [13]: def runjob(job):
   ....:     import subprocess
   ....:     try:
   ....:         returncode = subprocess.call(job['cmd'], shell=True)
   ....:         return dict(code=returncode)
   ....:     except OSError, e:
   ....:         return dict(error=str(e))
   ....:

In order to execute this runjob function in the job queue one by one, get aLoadBalancedView instance from the client.

In [14]: lview = rc.load_balanced_view()

Then, the runjob function is executed asynchronously for each element of the jobs list.

In [15]: ar = lview.map_async(runjob, jobs)

Execution of this code is not blocked and an immediate ʻAsyncResult object is returned. You can check the job execution result and progress status via this ʻAsyncResult object. For example, let's display the job execution status on an interactive shell (also available on IPython notebook).

In [16]: ar.wait_interactive()
  48/100 tasks finished after   15 s

When this wait_interactive function is called, the job execution status is displayed on the shell every second. The above display shows that 48 out of 100 jobs have been completed 15 seconds after the start of job execution. Since the time required for one job is 10 seconds and 48 engines are used at the same time, 48 jobs are completed in 10 seconds from the start of execution, and another 48 jobs are completed between 10 seconds and 20 seconds. Will be executed. When all the jobs are completed, the following will be displayed.

In [16]: ar.wait_interactive()
 100/100 tasks finished after   30 s
done

The execution of each job is asynchronous, but the wait_interactive function does not finish until all jobs are completed. If you want to stop displaying the progress of the job, you can interrupt it with [Ctrl] + [c]("Kernel"-"Interrupt" for IPython notebook). Even if the display is interrupted, the job execution will continue.

You can check the job execution result from the ʻAsyncResult` object. Let's check the execution results of the first 5 jobs.

In [17]: ar[:5]
Out[17]: [{'code': 0}, {'code': 0}, {'code': 0}, {'code': 0}, {'code': 0}]

When code is 0, it means that the return value of the shell (/ bin / sh) that executed the sleep command was 0.

Bakapara module

Bakapara is a modularization of the process of executing a job queue by a command on the shell on an IPython cluster. It is designed to be used from Python's interactive shell and IPython notebook. It can also be run independently from the command line.

Installation

If you want to use it as a module, download bakapara.py and place it in the directory where PYTHONPATH passes. .. When executing from the command line, it is convenient to place it in a directory that is in PATH.

Job specifications

The Bakapara object receives a list of jobs and runs them on the cluster engine. Each job can be in any format as long as it is a dictionary object that meets the following specifications.

Key value Example
cmd The command you want to execute on the engine. Since the command is actually executed via the shell, pipes and redirects can also be used. wc -l /work/001.txt
cd (Optional) Working directory when the engine executes commands. If this key does not exist, the engine working directory will not be changed. The working directory of each engine is initialized with the working directory when the Bakapara object is constructed. /home/okazaki/projects/bakapara
out (Optional) If you specify a file name for this value, the standard output when the command is executed is saved in the file. If this key does not exist, the contents of standard output will not be saved. /work/001.txt.out
err (Optional) If you specify a file name for this value, the standard error output when the command is executed is saved in the file. If this key does not exist, the contents of standard error output will not be saved. /work/001.txt.err
host (Optional) A list of hosts (engines) that can execute commands. If this key does not exist, the job is considered runnable on all engines. ['mezcal03.cl.ecei.tohoku.ac.jp',]

host is used when you want to execute a specific job on a specific host. For example, if the data you want to process is distributed on the local disk of each server, you can specify the host where the data required to execute the job is located with host. In addition, the distributed file system GFarm allows you to check which host each file on the distributed file system is located on, so processing is possible. By specifying the host where the data is located, distributed parallel processing that considers data locality such as HDFS + Hadoop can be realized.

The execution result of each job is stored (overwritten) as the value of the result key of the dictionary object. The value of result is a dictionary object with the following specifications.

Key value Example
code Exit code 0
submitted Date and time the job was submitted by the client '2014-12-13T01:17:05.593718'
started The date and time when the job started running on the engine '2014-12-13T01:17:04.559970'
completed Date and time when the job execution was completed on the engine '2014-12-13T01:17:14.566251'
received Date and time when the client received the job execution result '2014-12-13T01:17:15.614301'
elapsed Time required to execute the job (completed-started '0:00:10.006281'
engine_id ID (index) of the engine that executed the job 3
host The host name of the engine that ran the job 'mezcal06.cl.ecei.tohoku.ac.jp'
pyerr Python exceptions (if any). When you specify the host name to execute the jobUnmetDependencyException may be displayed, but this is normal. None
pyout Python interpreter output (if any) None
status 'ok'Or'error' 'ok'
msg_id UUID of messages exchanged between client and engine u'48fbc58b-ef73-4815-9f32-1932c01421ad'
error (Exists only when a fatal error occurs) Error message ''

Note that there are cases where command execution fails even if status is'ok'. For example, if you make a mistake in writing the cmd command and cannot execute it, status will be 'ok', but the standard error output will be'/ bin / bash: 1: hoge: not found. A message like \ n' remains. The job command is executed via / bin / bash -o pipe fail. Therefore, if any of the piped commands returns a status code other than 0, the return value will be stored in code. Therefore, it is important to check the return value code.

Use on Python interactive shell

First, import the bakapara module and create a Bakapara instance. The specification of the constructor of the Bakapara class is [IPython.parallel.Client](http://ipython.org/ipython-doc/dev/api/generated/IPython.parallel.client.client.html#IPython.parallel.client" .client.Client) is the same. Normally, you would specify the profile name as follows.

In [1]: from bakapara import Bakapara

In [2]: bp = Bakapara(profile='mezcal')

Create a list of jobs you want to run. The following code creates a job that executes the command sleep 10 100 times.

In [3]: jobs = [dict(cmd='sleep 10') for i in range(100)]

For the sake of explanation, let's check the first job.

In [4]: jobs[0]
Out[4]: {'cmd': 'sleep 10'}

Use the run method to start the job. The return value True indicates that the job registration was successful.

In [5]: bp.run(jobs)
Out[5]: True

You can check the progress of the job with the status method. The progress of the job is displayed every second.

In [6]: bp.status()
48/100 tasks finished after 0:00:12.554905

This status method does not finish executing until all jobs are completed. If you want to do something else, press [Ctrl] + [c]("Kernel"-"Interrupt" in IPython notebook) to interrupt.

When the job execution is completed, the following will be displayed.

In [7]: bp.status()
100/100 tasks finished after 0:00:30.137117
100 tasks completed in 0:00:30.137117

When the job is completed, the result is written to the job list (the job list given to the run method of the Bakapara class). To be precise, the execution result is written to the jobs list by calling the status method or the wait method after the job is completed. Even if the entire job is not completed, the results up to the point when the status method or wait method is called are written to the job list. To get the job execution result, call the status method and check that the message" xx / xxx tasks finished "is displayed.

Let's check the execution result. You can access information such as the host name that executed the job, the required time, and the exit code.

In [8]: jobs[0]
Out[8]:
{'cmd': 'sleep 10',
 'result': {'code': 0,
  'completed': '2014-12-13T12:26:12.665525',
  'elapsed': '0:00:10.005647',
  'engine_id': 11,
  'host': 'mezcal11.cl.ecei.tohoku.ac.jp',
  'msg_id': u'72666439-9364-4a37-9dfb-8a04921d9a0c',
  'pyerr': None,
  'pyout': None,
  'received': '2014-12-13T12:24:38.638917',
  'started': '2014-12-13T12:26:02.659878',
  'status': u'ok',
  'submitted': '2014-12-13T12:23:58.320781'}}

Use from the command line

As we have seen, using the Bakapara module is easy. However, although I don't understand Python at all, I think there are people who want to perform foolish distributed parallel processing. We have also prepared a command line interface for such people. When bakapara.py is executed alone, the JSON of the job is read from the standard input and the JSON of the job execution result is written to the standard output. The input and output JSON is one job per line, and the dictionary object of the job is described in JSON on each line. The reason why the entire job is not stored in the list format is so that the execution result can be written out as soon as the job is completed.

For example, the JSON file of a job that executes 100 sleep 10s is as follows.

$ head -n5 sleep.task.json
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}

To execute this job and write the execution result to sleep.result.json, execute the following command. The profile name of the IPython cluster is specified with the -p option.

$ python bakapara.py -p mezcal < sleep.task.json > sleep.result.json
2014-12-21 10:39:49,231 total 100 jobs on 48 engines
2014-12-21 10:39:59,288 [1/100] returned 0 in 0:00:10.007444 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [2/100] returned 0 in 0:00:10.005645 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [3/100] returned 0 in 0:00:10.005994 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [4/100] returned 0 in 0:00:10.006593 sec on mezcal01.cl.ecei.tohoku.ac.jp: sleep 10
...
2014-12-21 10:40:19,282 [97/100] returned 0 in 0:00:10.005299 sec on mezcal08.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [98/100] returned 0 in 0:00:10.005097 sec on mezcal08.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [99/100] returned 0 in 0:00:10.005758 sec on mezcal02.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [100/100] returned 0 in 0:00:10.004995 sec on mezcal02.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 completed

From the standard output, the executed job is returned as a dictionary object (encoded in JSON). You can see that the execution result is stored in the result key.

$ head -n5 sleep.result.json
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.262566", "code": 0, "engine_id": 9, "started": "2014-12-21T10:40:46.649199", "completed": "2014-12-21T10:40:56.656643", "msg_id": "22d664c5-793a-44f1-b29d-c74f2aa434c1", "submitted": "2014-12-21T10:39:49.235879", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.007444", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.262205", "code": 0, "engine_id": 11, "started": "2014-12-21T10:40:46.650998", "completed": "2014-12-21T10:40:56.656643", "msg_id": "e8eb5db2-ac9b-481b-b0a4-fdb2ef15be62", "submitted": "2014-12-21T10:39:49.236327", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.005645", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.287773", "code": 0, "engine_id": 8, "started": "2014-12-21T10:40:46.679033", "completed": "2014-12-21T10:40:56.685027", "msg_id": "8a7e6fe0-482a-4ae0-a2ff-8321849aa8b0", "submitted": "2014-12-21T10:39:49.244347", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.005994", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.284039", "code": 0, "engine_id": 46, "started": "2014-12-21T10:40:46.698136", "completed": "2014-12-21T10:40:56.704729", "msg_id": "f03f9b93-4a60-494b-9a21-625cdcac252e", "submitted": "2014-12-21T10:39:49.242042", "pyerr": null, "host": "mezcal01.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.006593", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.259553", "code": 0, "engine_id": 28, "started": "2014-12-21T10:40:46.889807", "completed": "2014-12-21T10:40:56.895995", "msg_id": "bc9e7b74-64ba-45f4-ac0e-31b27db5d862", "submitted": "2014-12-21T10:39:49.234939", "pyerr": null, "host": "mezcal07.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.006188", "pyout": null}}

About job interruption

While looking at the execution results of jobs running on the cluster, you may notice that the contents of the job are incorrect or that the job is out of control. In this case, the job execution will be cancelled. There are two ways to cancel a job run in Bakapara.

  1. Call the ʻabort ()` method.
  2. Stop the ʻipcluster` process and kill all engines and controllers.

In the case of method 1, the running job is not canceled, and only the jobs in the job queue are canceled. Therefore, it cannot be used when a job that takes a very long time is executed or when the job runs out of control.

Method 2 is a method of forcibly terminating the running job by terminating the engine and controller in the cluster execution environment. Specifically, you will press the [Ctrl] + [c] keys on the console where ʻipcluster` is executed.

^C2014-12-14 17:16:40.729 [IPClusterStart] ERROR | IPython cluster: stopping
2014-12-14 17:16:40.730 [IPClusterStart] Stopping Engines...
2014-12-14 17:16:43.734 [IPClusterStart] Removing pid file: /home/okazaki/.ipython/profile_mezcal/pid/ipcluster.pid

Unfortunately, the current version of the LoadBalancedView interface doesn't provide a way to stop a running job (see Resources: [[IPython-dev] Fwd: interrupt / abort parallel jobs](http: /). /mail.scipy.org/pipermail/ipython-dev/2014-March/013426.html)). In case of emergency, it is necessary to restart ʻipcluster` itself as in method 2.

in conclusion

When I started distributed parallel processing in a cluster environment (around 2005), I used GXP Grid & Cluster Shell. It was. GXP has the ability to run workflows like make, but I used it exclusively for stupid purposes. It's a useful tool, but it seems that development is currently stopped. GNU Parallal may have been sufficient for this purpose.

Around 2011, I had seriously considered using Hadoop, but it was difficult to manage and operate Hadoop at the university laboratory level. Also, parallelizing a small program implemented in Python with stupid paralysis requires additional work for Hadoop, so I felt that the hurdles were high for students. However, I found the distributed file system (HDFS) very convenient. Therefore, I started to use GFarm as a distributed file system. In order to process the data placed on the distributed file system in consideration of locality, we implemented our own job queue system using Paramiko. The wreckage of the job queue system is gfqsub.

Recently, however, experiments and data analysis work have begun to be performed on IPython notebook. IPython notebook is very convenient, but the affinity with command line tools is not good, so I searched for a Python library that realizes stupidity. When I looked it up, IPython cluster supported distributed parallel processing, but there was little information other than the official documentation, so I made Bakapara while reading the documentation myself.

This time, I realized stupidity using SSH, but IPython cluster seems to be able to use MPI, PBS (qsub), Windows HPC Server, Amazon EC2 in a unified manner by setting ʻipcluster_config.py`. Perhaps the Bakapara module will still work in these environments (I haven't tested it myself). In addition, IPython cluster can realize fairly advanced distributed parallel processing such as exchanging code data between the controller and engine. I would like to continue exploring what kind of interesting things can be done using these functions.

Recommended Posts

IPython cluster stupid (distributed parallel processing)
Python distributed processing Spartan
An introduction to Python distributed parallel processing with Ray
Parallel processing with multiprocessing
Parallel computing with iPython notebook
Parallel processing with local functions
Blender Modal Operator parallel processing
Parallel processing with Parallel of scikit-learn