[PYTHON] Luigi Reverse Lookup Reference

Python job manager Luigi 2.0 series memo.

Basic

Basic type of Task

standard_task.py


import luigi

class MyTask(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return MyDependentTask(self.date)

    def run(self):
        with self.output().open('w') as output:
            with self.input().open('r') as input:
                for line in input:
                    ret = do_something(line)
                    output.write(ret)
                    output.write('\n')

    def output(self):
        return luigi.LocalTarget('./out2_{0}.txt'.format(self.date.isoformat()))


class MyDependentTask(luigi.Task):
    date = luigi.DateParameter()

    def run(self):
        with self.output().open('w') as output:
            output.write("line1\n")
            output.write("line2\n")

    def output(self):
        return luigi.LocalTarget('./out1_{0}.txt'.format(self.date.isoformat()))


if __name__ == '__main__':
    luigi.run()

I want to output a binary file

Use luigi.format.Nop. For example, if you want to pickle.

import pickle

import luigi


class SomeTask(luigi.Task):
    def requires(self):
        return xxxx

    def run(self):
        some_obj = hoge()
        with self.output().open('w') as output:
            output.write(pickle.dumps(some_obj, protocol=pickle.HIGHEST_PROTOCOL))

    def output(self):
        return luigi.LocalTarget(
            format=luigi.format.Nop,
            path='xxxxxxxx')


class NextTask(luigi.Task):
    def requires(self):
        return SomeTask()

    def run(self):
        with self.input().open('r') as infile:
            ret = pickle.load(infile)

Enter a gzipped file

Returns Target with luigi.format.GzipFormat passed in the output of the dependent Task.

Gzip the output file

Pass luigi.format.GzipFormat to Target's format as you did when typing

class MyTask(luigi.Task):
    def run(self):
        with self.output().open('w') as output:
            output.write('aaaa')

    def output(self):
        return luigi.LocalTarget('./out.gz', format=luigi.format.GzipFormat())

I want to output Pandas DataFrame

Specify luigi.format.Nop as the output format and pickle and write the DataFrame. Do not use to_csv etc. as the type will be lost.


def run(self):
    result_df = do_something()
    with self.output().open('w') as output:
        output.write(pickle.dumps(result_df, protocol=pickle.HIGHEST_PROTOCOL))

The side that receives as input


def run(self):
    with self.input().open('r') as infile:
        input_df: pd.DataFrame = pickle.load(infile)
        do_something(input_df)    

A task that just executes a task

luigi.WrapperTask does not implement run or ʻoutput`.

class MyInvokerTask(luigi.WrapperTask):
    def requires(self):
        return [FugaTask(), BarTask(), BuzTask(), FooTask()]

Execute dependent tasks in parallel

class MyInvokerTask(luigi.WrapperTask):
    def requires(self):
        return [MyDepTask1(), MyDepTask2(), MyDepTask3()]

class MyDepTask1(luigi.Task):
    priority = 100

    #The following is omitted

Then, add --workers 2 or something in the startup command. Look at the proprity of each task and execute it with priority from the highest one.

Execute dependent tasks in sequence

If you don't define luigi-like dependencies but want to process them serially

class MyInvokerTask(luigi.WrapperTask):
    def requires(self):
        yield MyDepTask1()
        yield MyDepTask2()
        yield MyDepTask3()

Do not chain dependent tasks

If you luigi.task.externalize the task object, it will not run, it will only check if the output is generated.

class MyTask(luigi.Task):
    def requires(self):
        return externalize(MyDependencyTask())

    def run(self):
        print('Someone has finished MyDependencyTask')

Manually retry a moss job

If the task is PENDING or invisible (released from the scheduler) in Visualiser, you can execute the command again. Only tasks for which no output has been generated on the dependency tree will be executed.

Setting to automatically retry moss jobs

The default settings will not retry, so specify the following 4 items in the configuration file. ** Note: In Version 2.5, the setting items around the retry have changed **

luigi.cfg


[core]
worker-keep-alive: true
max-reschedules: 20

[scheduler]
disable-num-failures: 10
retry-delay: 300

Stop retrying if retries continue to fail

Disables the task if it is mossed the number of times disable-num-failures within the time specified by disable-window-seconds.

luigi.cfg


disable-num-failures: 20
disable-window-seconds: 3600

Continue to wait for external task output

If you set retry-external-tasks: true in luigi.cfg, ExternalTask will also be retried. retry-delay can be specified for each scheduler and cannot be specified for each task.

Collect task processing time

luigi.Task.event_handler You can make hooks with the decorator. If you collect the task elapsed time in the handler for PROCESSING_TIME, you only need to implement it in one place.

@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def on_processing_time(task, duration):
    logger.debug('Task {} proceed {:.1f} sec'.format(task, duration))
    #Throw somewhere to collect metrics
    # ...

AWS

http://luigi.readthedocs.org/en/stable/_modules/luigi/s3.html

Input the file that the external task put in S3

Use luigi.s3.S3PathTask

class MyTask(luigi.Task):
    def requires(self):
        return luigi.s3.S3PathTask('s3://xxxxxxx')

If gzipped

class MyTask(luigi.Task):
    def requires(self):
        return GzipS3FileTask('s3://hoge/fuga.gz')

    def run(self):
        input = self.input().open('r') #Can be read with

class GzipS3FileTask(luigi.s3.S3PathTask):
    path = luigi.Parameter()

    def output(self):
        return luigi.s3.S3Target(self.path, format=luigi.format.GzipFormat())

Output the result to S3

Write with output set to luigi.s3.S3Target.

class MyTask(luigi.Task):
    def run(self):
        with self.output().open('w') as output:
            output.write('Hey')

    def output(self):
        return luigi.s3.S3Target('s3://hoge/fuga.txt')

Use STS (Security Token Service) connection to access S3

Pass the client for STS connection to the client of S3Target

class MyS3FileTask(luigi.s3.S3PathTask):
    path = luigi.Parameter()

    def output(self):
        #Pass the key obtained from the assumed role
        client = luigi.s3.S3Client(
            aws_access_key_id=xxxxx,
            aws_secret_access_key=yyyyy,
            security_token=zzzz)
        return luigi.s3.S3Target('s3://xxxx', client=client)

Skip error notifications to SNS

Make the settings like this

luigi.cfg


[core]
error-email: arn:aws:sns:ap-northeast-1:0000000000:sns-LuigiError

[email]
type: sns
force-send: true #True when you want to skip even during manual execution

As appropriate, pass ʻAWS_DEFAULT_REGION` etc. to the startup command. You don't need to specify credentials when using the IAM role of an EC2 instance.

AWS_DEFAULT_REGION=ap-northeast-1 python sns_test.py Invoke

GCP

GCP credentials are passed in the environment variable GOOGLE_APPLICATION_CREDENTIALS.

Enter a GCS file

Use luigi.contrib.gcs.GCSTarget

Since GCSTarget is made without assuming an error even though network access occurs when creating an instance, it is better to retry when 503 returns.

Output to GCS

Write to luigi.contrib.gcs.GCSTarget

import luigi
from luigi.contrib.gcs import GCSTarget

class MyTask(luigi.Task):
   def requires(self):
       return GCSPathTask(path='gs://hoge/fuga.txt')

    def run(self):
        with self.input().open('r') as input:
            #Do something
        
        with self.output().open('w') as output:
            #Write something in output

    def output(self):
        return GCSTarget('gs://hoge/fuga_result.txt')

class GCSPathTask(luigi.ExternalTask):
    path = luigi.Parameter()
    
    def output(self):
        return GCSTarget(self.path)

Run a BigQuery load job

luigi.contrib.bigquery is awkward to use, so it's better to write it in advance. In particular, BigQuery Target cannot re-execute the task without deleting the table.

Recommended Posts

Luigi Reverse Lookup Reference
Reverse lookup pytest
Python Date / Time Library Reverse Lookup Reference
Server setting tips Reverse lookup
Django admin screen reverse lookup memo