[PYTHON] Precautions when handling Luigi

Luigi:2.5.0 python:3.6

Precautions when handling Luigi

Luigi has an article that avoids the problem that parallel processing is not possible in the Windows environment. Force luigi to do parallel processing in windows environment

About the number of executions of requires and run

~~ There is a problem with the generator, ~~ The requires method and run method of the task that returns the dependent task are called multiple times by the scheduler. This means that the process written in run or requires will be executed multiple times depending on the situation. Therefore, it is safer not to write expensive processes or processes that affect the outside in the method that returns dependent tasks.

When transitioning from the dependent task to the processing of the dependent task, the generator object of the dependent task is overwritten, and when returning to the dependent task, a new generator object is acquired again, so the beginning of the method each time It will be restarted from. https://github.com/mtoriumi/luigi/blob/5678b6119ed260e8fb43410675be6d6daea445d1/luigi/worker.py#L130

regenerated_task_gen.gif

Sample:

from luigi import Task, run
from luigi.mock import MockTarget
from inspect import currentframe


class DependentTask(Task):

    def output(self):
        print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
        return MockTarget('out.txt')

    def run(self):
        print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
        with self.output().open('w') as fout:
            fout.write('DependentTask is succeeded')

    def on_success(self):
        print("Reached {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))

    def on_failure(self, exception):
        print("Reached {}.{} {}".format(self.__class__.__name__, currentframe().f_code.co_name, exception))


class StartTask(Task):

    def output(self):
        print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
        return MockTarget("StartTaskOut.txt")

    def run(self):
        print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
        with self.output().open('w') as fout:
            fout.write('StartTask is succeeded')
        yield DependentTask()

    def on_success(self):
        print("Reached {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))

    def on_failure(self, exception):
        print("Reached {}.{} {}".format(self.__class__.__name__, currentframe().f_code.co_name, exception))


if __name__ == '__main__':
    run(main_task_cls=StartTask)

Output:

running StartTask.output
running StartTask.run
running StartTask.output
running DependentTask.output
running DependentTask.output
running DependentTask.run
running DependentTask.output
Reached DependentTask.on_success
running StartTask.run
running StartTask.output
running DependentTask.output
running DependentTask.output
Reached StartTask.on_success

About the retry count limit for each task

~~ There is a bug in the process of retry_count, which specifies the number of retries for each task, and it is said that the scheduler's retry count limit setting will be used for the second and subsequent retries. Currently, it is not working properly, so we recommend that you do not use it. The cause is that the retry policy (retry_policy_dict) is not specified in the task registration location from the second time onward below. https://github.com/spotify/luigi/blob/b33aa3033405bfe41d9f5a806d70fb8e98214d86/luigi/worker.py#L974-L984~~

~~ Similar issues have been asked below in the past. http://stackoverflow.com/questions/39595786/luigi-per-task-retry-policy~~

~~ This is an issue that is currently sending a pull request and is awaiting review. https://github.com/spotify/luigi/pull/2012~~

It has been merged.

About luigid (central scheduler) settings

The LUIGI_CONFIG_PATH environment variable must also be specified when starting luigid. luigid works independently, so if you want to reflect the contents of the scheduler section, you have to specify the configuration file when you start luigid.

About the processing order of task methods

Tasks are processed in the order ʻoutput () => requires`` => run () `.

Passing parameters from the command line

When passing parameters from the command line, the underscores in the parameter names are changed to hyphens.

Type conversion when taking parameters

If you take a parameter with DictParameter, it will be a unique type called FrozenOrderedDict. Therefore, some methods that could be used in the built-in type cannot be used.

Good value to pass to the parameter

Parameters other than built-in type cannot be routed between parallel tasks. If an object is given, it will be converted to a class name string. If it is in series, it can be handed over. This is a specification that parallel tasks operate in multiple processes, and the parameters are once converted to JSON format.

Task end condition

There are two conditions for completing a task.

--Create file to target specified by output = open / close target (successful task) --Exception in task (task failure)

If any of the above are not met, the task will not end forever until killed.

Exception occurred while opening the target

If an exception occurs while the target is open in write mode, the temporary write file will remain as garbage due to File busy, so do not write a process that can cause an exception after opening the target.

Recommended Posts

Precautions when handling Luigi
Precautions when installing fbprophet
Precautions when using Chainer
Error handling when installing mecab-python
Precautions when outputting to BigQuery table every hour with Luigi
Precautions when using pit in Python
Precautions when inheriting the DatasetMixin class
Precautions when using TextBlob trait analysis
Precautions when installing tensorflow with anaconda
Precautions when using codecs and pandas
Precautions when using the urllib.parse.quote function
Precautions when creating a Python generator
Error handling when updating Fish shell
Precautions when using phantomjs from python
Precautions when using six with Python 2.5
Precautions and error handling when calling .NET DLL from python using pythonnet
Precautions for handling png and jpg images
Precautions when pickling a function in python
Precautions when solving DP problems with Python
Precautions when using for statements in pandas