[PYTHON] Smoother pipeline processing with Luigi! Introducing gokart

what is this?

This is a summary article about Luigi's wrapper library gokart.

The motives for development and basic usage are summarized very carefully in M3's blog, and the basic usage is It's a story that you should read here, but I wanted to summarize it as a reverse lookup reference, so I wrote it as an article.

In addition, I will not explain much about the functions of Luigi itself.

What is Luigi?

A type of OSS for the Pipeline framework developed by Spotify. Implemented in Python, inheriting luigi.Task

--requires (): Dependent Task --run (): Process to be executed --ʻOutput () `: Output destination

You can easily create a workflow just by writing the three methods.

The origin of the name

Also it should be mentioned that Luigi is named after the pipeline-running friend of Super Mario.

apparently

What is gokart?

Gokart is a wrapper library that makes Luigi easier to use.

The name probably comes from Mario (Kart).

Basic

The functions of gokart == 0.3.6 are summarized below.

Building a Task

When creating a task, inherit gokart.TaskOnKart instead of luigi.Task.

    import gokart
    
    class TaskA(gokart.TaskOnKart):
        def run(self):
            data= pd.DataFrame(load_iris()['data'])
            self.dump(data)
    
    class TaskB(gokart.TaskOnKart):
        def reuires(self):
             return TaskA()
        
    		#output is optional
        def output(self):
            return self.make_target('data.pkl')
            
        def run(self):
            df =self.load()
        
            self.dump(df)

The basic usage is the same as Luigi, but since you only need to use self.dump (object you want to save), it can be considerably simplified compared to performing the same processing with Luigi alone. In addition, the def output (self) can be omitted, in which case it will be saved in the pickle format.

Run

Execute as follows.

    gokart.run(['TaskB', '--local-scheduler'])

When executed, the object will be saved under resources as shown below.


    resources
    ├── data_3eba828ec57403111694b0fb1e3f62e0.pkl
    └── log
        ├── module_versions
        │   └── TaskB_3eba828ec57403111694b0fb1e3f62e0.txt
        ├── processing_time
        │   └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
        ├── task_log
        │   └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
        └── task_params
            └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl

The hash value is attached to the specified file name and saved. Since the hash value is determined by the parameters of the Task, the Task will be re-executed if the parameters are changed, unlike the case of ** Luigi alone. ** This is also one of the benefits of gokart. (Although it will be described later, it is also possible to save without adding a hash value)

log has

--Version of the module used --Processing time --log output by logger --Applied parameters

Is saved.

Specifying the save destination PATH

By default, it is saved under resources, but the save destination directory is in the configuration file.


    [TaskOnKart]
    workspace_directory=./output

It can be changed by specifying as.

load

Combine DataFrames and then load

Of course, you can also load a saved DataFrame with self.load (), but if you want to load a set of DataFrames like [df1, df2, df3 ...], you can use load_dataframe. You can load multiple DataFrames in a vertically combined state.

You can also optionally specify a column with set to raise an exception if that column does not exist in the DataFrame to load.


    class TaskA(gokart.TaskOnKart):
        def run(self):
            df1 = pd.DataFrame([1,2], columns=['target'])
            df2 = pd.DataFrame([3,4], columns=['target'])
            df3 = pd.DataFrame([5,6], columns=['target'])
            self.dump([df1, df2, df3])
    
    class TaskB(gokart.TaskOnKart):
    		def requires(self):
    			return TaskA()
    
        def run(self):
    				#Loaded after being concated
            df =self.load_data_frame(required_columns={'target'})
            self.dump(df)

Specify by key and load

If there are multiple dependent Tasks, you can define the dependent Tasks in dictionary format and read them with key as shown below. Luigi alone can load multiple Tasks, but it doesn't support dictionary forms, so using the dictionary format can improve the readability of your code.


    class TrainModel(gokart.TaskOnKart):
        def requires(self):
            return {'data': LoadData(), 'target': LoadTarget()}
        
        def run(self):
            data = self.load('data')
            target = self.load('target')
            
            model = LogisticRegression()
            model.fit(data, target)
            
            self.dump(model)

Sequentially load

You can use self.load_generator to load and process Tasks sequentially.


    from sklearn.datasets import load_iris
    from sklearn.datasets import load_wine
    
    
    class LoadWineData(gokart.TaskOnKart):
        def run(self):
            data = load_wine()['data']
            
            self.dump(data)
            
    class LoadIrisData(gokart.TaskOnKart):
        def run(self):
            data = load_iris()['data']
            
            self.dump(data)
    
    class LoadGenerator(gokart.TaskOnKart):
        def requires(self):
            return [LoadWineData(), LoadIrisData()]
        
        def run(self):
            for data in self.load_generator():
                print(f'data_shape={data.shape}')
                # data_shape=(178, 13)
                # data_shape=(150, 4)

output

Save without hash value

If ʻuse_unique_id = False`, the hash value will not be attached to the file name.

    def output(self):
            return self.make_target('data.pkl', use_unique_id=False)

Save a model that spans multiple files

For formats such as gensim and TensorFlow where models are saved across multiple files, you can use make_model_target as shown below to compress and save them all at once.

    def output(self):
            return self.make_model_target(
                'model.zip', 
                save_function=gensim.model.Word2Vec.save,
                load_function=gensim.model.Word2Vec.load)

By passing a function for saving and restoring as a parameter like, the model and load_function are compressed and saved as a set in zip format (in this case), and the calling Task is particularly concerned. You can restore the model with self.load () without having to.

Save a huge DataFrame

If you use make_large_data_frame_target as shown below, DataFrame will be divided into multiple records for each capacity specified by max_byte, compressed into one, and then saved.

    def output(self):
            return self.make_large_data_frame_target('large_df.zip', max_byte=2**10)

By the way, the above-mentioned make_model_target is used internally.

Save DataFrame in various formats

If you want to convert DataFrame to a format other than pickle and save it, just add the extension of that format and the internal FileProcessor will convert it to the target format and save it.

Currently supported formats are

    - pickle
    - npz
    - gz
    - txt
    - csv
    - tsv
    - json
    - xml

is.


    class LoadWineData(gokart.TaskOnKart):
        def run(self):
            data = load_wine()['data']
            
            self.dump(data)
            
    class ToCSV(gokart.TaskOnKart):
        def requires(self):
            return LoadWineData()
        
        def output(self):
            #Define the extension you want to save in suffix
            return self.make_target('./wine.csv')
        
        def run(self):
            df = pd.DataFrame(self.load())
            self.dump(df)

Specify the save destination as GCS or S3

If the path of work_space_directory described in the configuration file starts withgs: //, all output results will be uploaded to GCS, and if it iss3: //, all output results will be uploaded to S3.

    [TaskOnKart]
    #prefix gs://Or s3://Then all outputs will be saved in the cloud
    workspace_directory=gs://resources/

It's very convenient because you can change the code without any modification unlike the case of Luigi alone.

Other

Specify Parameter from environment variable

By writing parameter = $ {environment variable} in the configuration file, you can specify the Parameter of Task in the environment variable.

This is very useful when you want to separate the test from the production, or when you want to change whether to save in the cloud for each environment you run.


    [TaskOnKart]
    workspace_directory=${WORK_SPACE}
    
    [feature.LoadTrainTask]
    is_test=${IS_TEST}

--.zshrc


    export IS_TEST=False
    datetime=`date "+%m%d%H%Y"`
    export WORK_SPACE="gs://data/output/${datetime}"

Personally, I would like to turn it a little for confirmation locally before turning it tightly with GCE, but luigi.cfg is very useful because I want to use a common one.

Take an instance of Task as Parameter

If you use gokart. (List) TaskInstanceParameter, you can take Task as Parameter of Task. This allows you to reuse the same Task by creating a Task that does not depend on a specific Task, increasing the possibility of writing more flexible code.


    from sklearn.datasets import  load_wine
    from sklearn.linear_model import LogisticRegression
    
    
    class LoadWineData(gokart.TaskOnKart):
        def run(self):
            data = load_wine()['data']
            
            self.dump(data)
            
    class LoadWineTarget(gokart.TaskOnKart):
        def run(self):
            target = load_wine()['target']
            
            self.dump(target)
            
    
    class Trainer(gokart.TaskOnKart):
        #Takes Task as an argument
        data_task = gokart.TaskInstanceParameter(description='data for train')
        target_task= gokart.TaskInstanceParameter(description='target for train')
        
        def requires(self):
            return {'data': self.data_task, 'target': self.target_task}
        
        def run(self):
            data = self.load('data')
            target = self.load('target')
            
            model = LogisticRegression()
            model.fit(data, target)
            
            self.dump(model)
    
            
    class ExcuteTrain(gokart.TaskOnKart):
        def requires(self):
            #Inject Task
            return Trainer(data_task=LoadWineData(), target_task=LoadWineTarget())
        
        def run(self):
            trained_model = self.load()
            
            self.dump(trained_model)

Notify Slack

You can notify slack by writing the following in the configuration file. From a security point of view, it is better to define token as an environment variable instead of solid writing.

    [SlackConfig]
    token=${SLACK_TOKEN}    
    channel=channel_name
    to_user=hase_hiro

Finally

I would appreciate it if you could point out any differences in behavior.

Recommended Posts

Smoother pipeline processing with Luigi! Introducing gokart
Data pipeline construction with Python and Luigi
Image processing with MyHDL
Processing datasets with pandas (1)
Processing datasets with pandas (2)
Parameter tuning with luigi (2)
Parameter tuning with luigi
Image processing with Python
Parallel processing with multiprocessing
Image Processing with PIL