[PYTHON] To improve the reusability and maintainability of workflows created with Luigi

About this article

In this article, based on my experience using Luigi, a workflow management framework that runs on Python, for data analysis, wouldn't it be easier to maintain a workflow like this? It describes the method that I thought. Some may find it inappropriate to say below, but it seems that these things haven't been discussed much in the first place, so I hope it will be a source of discussion. I will.

It doesn't touch on what tools Luigi is, what its advantages are, and what its basic usage is. Some of Qiita have already written an article, so please refer to that (Big data analysis using data flow control framework Luigi / 453aeec7f4f420b91241), Workflow management with Luigi, etc.).

How to use Parameter

Regarding the handling of Parameters, which are indispensable for controlling the operation of tasks.

Do not use instance variables other than those defined as Parameter

Obviously, if you don't use Parameter, Luigi won't recognize the task you want to run with different settings as different. Also, to set an instance variable other than Parameter when loading a workflow, you need to write a \ _ \ _ init__ method, which makes the description unnecessary and complicated.

When creating a task by inheriting the task, override all Parameters of the parent class

In Luigi, when you instantiate a task that inherits a task as a dependent task, you must set all the Parameters defined in the class variables in the inherited classes. For example, suppose you define a task group as follows.

class Parent(luigi.ExternalTask):
    hoge = luigi.Parameter()


class Child(Parent):
    foo = luigi.Parameter()


#MissingParameterException is thrown
class All1(luigi.WrapperTask):
    def requires(self):
        yield Child(foo="foo")


#Will be executed
class All2(luigi.WrapperTask):
    def requires(self):
        yield Child(foo="foo", hoge="hoge")

All2 above is executable, but All1 trying to instantiate a Child with only foo set to a value will throw a MissingParameterException when trying to do it. In addition to Child's class variable foo, you must also set hoge, which is defined as a Parent's class variable. If so, it would be kinder to be explicit about what needs to be set as follows.

class Child(Parent):
    foo = luigi.Parameter()
    hoge = luigi.Parameter()

Use complicated Parameter groups in a dict

For example, in a task that outputs using csv.writer, consider a situation where you want to be able to flow calculations that change the behavior of csv.writer with keyword arguments. At this time, rather than having each keyword argument of csv.writer as a Parameter, it is clearer and more flexible to change it if it is stored in one Parameter as shown below.

class OutputCSV(luigi.Task):

    csv_writer_kwargs = luigi.Parameter(default=dict(sep='\t'))
    ...
    def run(self):
        with open(self.output().fn, 'wb') as file:
            writer = csv.writer(file, **self.csv_writer_kwargs)
        ...

Dependent tasks that are likely to change make the dependent task class itself a parameter

For example, suppose you have a workflow in which TaskB and TaskC depend on TaskA. Now, if you're likely to create a workflow that uses TaskB or TaskC for another task, it's better to do something like Example 2 rather than Example 1 below.

Example 1


class TaskA(luigi.ExternalTask):
    param1 = luigi.Parameter()
    ...

class TaskB(luigi.Task):

    def requires(self):
        yield TaskA(param1="hoge")
    ...

class TaskC(luigi.Task):

    def requires(self):
        yield TaskA(param1="hoge")
    ...

class All(luigi.WrapperTask):

    def requires(self):
        yield TaskB()
        yield TaskC()

Example 2


class TaskA(luigi.ExternalTask):

    param1 = luigi.Parameter()
    ...

class TaskB(luigi.Task):

    required_task = luigi.Parameter(default=TaskA)
    params = luigi.Parameter(default=dict(param1="hoge"))

    def requires(self):
        yield self.required_task(**self.params)
    ...

class TaskC(luigi.Task):

    required_task = luigi.Parameter(default=TaskA)
    params = luigi.Parameter(default=dict(param1="hoge"))

    def requires(self):
        yield self.required_task(**self.params)
    ...

class All(luigi.WrapperTask):

    def requires(self):
        yield TaskB()
        yield TaskC()

Now suppose you want to add the process of executing TaskB and TaskC on the result of TaskA2. The following is a comparison of the necessary corrections for both.

Modification from Example 1


class TaskA(luigi.ExternalTask):

    param1 = luigi.Parameter()
    ...

class TaskA2(luigi.ExternalTask):

    param2 = luigi.Parameter()
    ...

class TaskB(luigi.Task):

    def requires(self):
        yield TaskA(param1="hoge")
    ...

class TaskC(luigi.Task):

    def requires(self):
        yield TaskA(param1="hoge")
    ...


class TaskB2(TaskB):

    def requires(self):
        yield TaskA2(param2="foo")
    ...

class TaskC2(TaskC):

    def requires(self):
        yield TaskA2(param2="foo")
    ...

class All(luigi.WrapperTask):

    def requires(self):
        yield TaskB()
        yield TaskC()
        yield TaskB2()
        yield TaskC2()

Modification from Example 2


class TaskA(luigi.ExternalTask):

    param1 = luigi.Parameter()
    ...

class TaskA2(luigi.ExternalTask):

    param2 = luigi.Parameter()
    ...

class TaskB(luigi.Task):

    required_task = luigi.Parameter(default=TaskA)
    params = luigi.Parameter(default=dict(param1="hoge"))

    def requires(self):
        yield self.required_task(**self.params)
    ...

class TaskC(luigi.Task):

    required_task = luigi.Parameter(default=TaskA)
    params = luigi.Parameter(default=dict(param1="hoge"))

    def requires(self):
        yield self.required_task(**self.params)
    ...

class All(luigi.WrapperTask):

    def requires(self):
        yield TaskB()
        yield TaskC()
        yield TaskB(
            required_task=A2,
            params=dict(param2="foo"))
        yield TaskC(
            required_task=A2,
            params=dict(param2="foo"))

In this way, if it is a form like Example 2, all you have to do is rewrite All, and you do not have to define something like TaskB2 or TaskC2 as in Example 1.

How to keep the results consistent

Everything below concerns the complete method. The default behavior of the complete method of the Task class is simply whether the existing method of the output Target returns True. If getting consistent output is a high priority, it's better to define a complete method that adheres to the following three points, rather than using the default complete method. However, it may not be suitable for some applications because it increases the number of situations where the workflow stops or is recalculated.

complete checks if all dependent task completes return True

If you define that all the tasks included in the workflow call the complete method of the dependent task, when the complete task at the end of the workflow is called, complete is recursively called for the entire workflow. And if it finds a place where complete returns False, all the tasks downstream from it will be executed. By the way, the complete method of WrapperTask is exactly this "return True if all the return values of complete of the dependent task are True".

The complete method checks the input / output timestamps

Make sure that the complete method returns False if the output date and time is earlier than the input date and time. This is so that after correcting a part of the intermediate result, the correction will be reflected in the entire necessary part.

Put the verification process of the calculation result in the complete method

If you get strange results in the middle of the workflow, you can stop at that point. Also, even if the result is obtained, the calculation can be performed by re-executing the workflow.

Recommended Posts

To improve the reusability and maintainability of workflows created with Luigi
I tried to improve the efficiency of daily work with Python
10 methods to improve the accuracy of BERT
How to display the CPU usage, pod name, and IP address of a pod created with Kubernetes
Move what you installed with pip to the conda environment
I want to connect remotely to another computer, and the nautilus command
To improve the reusability and maintainability of workflows created with Luigi
Instantiation of the BOX development environment created earlier
Try to separate the background and moving object of the video with OpenCV
Script to tweet with multiples of 3 and numbers with 3 !!
Specify the start and end positions of files to be included with qiitap
Add information to the bottom of the figure with Matplotlib
Extract images and tables from pdf with python to reduce the burden of reporting
I tried to automate the article update of Livedoor blog with Python and selenium.
Visualize the range of interpolation and extrapolation with python
Try to get the contents of Word with Golang
I just wanted to extract the data of the desired date and time with Django
I tried to compare the processing speed with dplyr of R and pandas of Python
How to output the number of VIEWs, likes, and stocks of articles posted on Qiita to CSV (created with "Python + Qiita API v2")
I tried to automatically post to ChatWork at the time of deployment with fabric and ChatWork Api
I tried to get the number of days of the month holidays (Saturdays, Sundays, and holidays) with python
Return the image data with Flask of Python and draw it to the canvas element of HTML
How to insert a specific process at the start and end of spider with scrapy
I tried to find the entropy of the image with python
A network diagram was created with the data of COVID-19.
See the power of speeding up with NumPy and SciPy
[pyqtgraph] Add region to the graph and link it with the graph region
Try to improve the accuracy of Twitter like number estimation
Hook to the first import of the module and print the module path
Try to automate the operation of network devices with Python
I want to know the features of Python and pip
Commands and files to check the version of CentOS Linux
Play with the password mechanism of GitHub Webhook and Python
Get the source of the page to load infinitely with python.
Try to extract the features of the sensor data with CNN
About the components of Luigi
Process the gzip file UNLOADed with Redshift with Python of Lambda, gzip it again and upload it to S3
Find the white Christmas rate by prefecture with Python and map it to a map of Japan
I compared the speed of Hash with Topaz, Ruby and Python
The story of not being able to run pygame with pycharm
Repeat with While. Scripts to Tweet and search from the terminal
Save the results of crawling with Scrapy to the Google Data Store
Become familiar with (want to be) around the pipeline of spaCy
I tried to automate the watering of the planter with Raspberry Pi
How to get the ID of Type2Tag NXP NTAG213 with nfcpy
[EC2] How to install chrome and the contents of each command
[Introduction to Python] I compared the naming conventions of C # and Python.
[Python] How to get the first and last days of the month
[Introduction to StyleGAN] I played with "The Life of a Man" ♬
Try to solve the N Queens problem with SA of PyQUBO
I want to output the beginning of the next month with Python
Correspondence analysis of sentences with COTOHA API and save to file
Consider the speed of processing to shift the image buffer with numpy.ndarray
Solving the Maze with Python-Supplement to Chapter 6 of the Algorithm Quick Reference-
How to monitor the execution status of sqlldr with the pv command
Get the URL of a JIRA ticket created with the jira-python library
[Required subject DI] Implement and understand the mechanism of DI with Go
I summarized how to change the boot parameters of GRUB and GRUB2
The strongest way to use MeCab and CaboCha with Google Colab
I want to check the position of my face with OpenCV!
From the introduction of JUMAN ++ to morphological analysis of Japanese with Python
Convert the result of python optparse to dict and utilize it
PhytoMine-I tried to get the genetic information of plants with Python
I tried to take the difference of Config before and after work with pyATS / Genie self-made script