[PYTHON] Reibungslosere Pipeline-Verarbeitung mit Luigi! Gokart vorstellen

Was ist das?

Dies ist ein zusammenfassender Artikel über Luigis Wrapper-Bibliothek gokart.

Die Motive für Entwicklung und grundlegende Verwendung sind in M3s Blog sehr sorgfältig zusammengefasst, und die grundlegende Verwendung ist Es ist eine Geschichte, die Sie hier lesen sollten, aber ich wollte sie als umgekehrte Referenz zusammenfassen, also habe ich daraus einen Artikel gemacht.

Außerdem werde ich nicht viel über die Funktionen von Luigi selbst erklären.

Was ist Luigi?

Eine Art von OSS für das von Spotify entwickelte Pipeline-Framework. In Python implementiert und "luigi.Task" erben

--requires () : Abhängige Aufgabe --run () : Auszuführender Prozess --output () : Ausgabeziel

Sie können ganz einfach einen Workflow erstellen, indem Sie die drei Methoden schreiben.

Der Ursprung des Namens

Also it should be mentioned that Luigi is named after the pipeline-running friend of Super Mario.

anscheinend

Was ist gokart?

Gokart ist eine Wrapper-Bibliothek, die die Verwendung von Luigi vereinfacht.

Der Ursprung des Namens ist wahrscheinlich Mario (Kart).

Basic

Die Funktionen von gokart == 0.3.6 sind unten zusammengefasst.

Eine Aufgabe erstellen

Erben Sie beim Erstellen einer Aufgabe "gokart.TaskOnKart" anstelle von "luigi.Task".

    import gokart
    
    class TaskA(gokart.TaskOnKart):
        def run(self):
            data= pd.DataFrame(load_iris()['data'])
            self.dump(data)
    
    class TaskB(gokart.TaskOnKart):
        def reuires(self):
             return TaskA()
        
    		#Ausgabe ist optional
        def output(self):
            return self.make_target('data.pkl')
            
        def run(self):
            df =self.load()
        
            self.dump(df)

Die grundlegende Verwendung ist dieselbe wie bei Luigi, Sie müssen jedoch nur "self.dump (Objekt, das Sie speichern möchten)" ausführen, sodass dies im Vergleich zur alleinigen Verarbeitung mit Luigi allein erheblich vereinfacht werden kann. Außerdem kann die "def output (self)" weggelassen werden. In diesem Fall wird sie im "pickle" -Format gespeichert.

Lauf

Führen Sie wie folgt aus.

    gokart.run(['TaskB', '--local-scheduler'])

Bei der Ausführung wird das Objekt wie unten gezeigt unter "Ressourcen" gespeichert.


    resources
    ├── data_3eba828ec57403111694b0fb1e3f62e0.pkl
    └── log
        ├── module_versions
        │   └── TaskB_3eba828ec57403111694b0fb1e3f62e0.txt
        ├── processing_time
        │   └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
        ├── task_log
        │   └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
        └── task_params
            └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl

Der Hashwert wird an den angegebenen Dateinamen angehängt und gespeichert. Da der Hash-Wert durch die Parameter der Aufgabe bestimmt wird, wird die Aufgabe erneut ausgeführt, wenn die Parameter geändert werden, im Gegensatz zu ** Luigi allein. ** Dies ist auch einer der Vorteile von Gokart. (Obwohl es später beschrieben wird, ist es auch möglich, ohne Hinzufügen eines Hashwerts zu speichern.)

log hat

--Version des verwendeten Moduls --Verarbeitungszeit --log Ausgabe durch Logger

Ist gespeichert.

Angeben des Speicherzielpfads

Standardmäßig wird es unter "Ressourcen" gespeichert, aber das Speicherzielverzeichnis befindet sich in der Einstellungsdatei.


    [TaskOnKart]
    workspace_directory=./output

Sie kann durch Angabe von als geändert werden.

load

Kombinieren Sie DataFrames und laden Sie sie

Natürlich können Sie einen gespeicherten DataFrame auch mit "self.load ()" laden. Wenn Sie jedoch eine Reihe von DataFrames wie "[df1, df2, df3 ...]" laden möchten, können Sie "load_dataframe" verwenden. Sie können mehrere DataFrames in einem vertikal kombinierten Zustand laden.

Optional können Sie auch eine Spalte mit "set" angeben, um eine Ausnahme auszulösen, wenn diese Spalte im zu ladenden "DataFrame" nicht vorhanden ist.


    class TaskA(gokart.TaskOnKart):
        def run(self):
            df1 = pd.DataFrame([1,2], columns=['target'])
            df2 = pd.DataFrame([3,4], columns=['target'])
            df3 = pd.DataFrame([5,6], columns=['target'])
            self.dump([df1, df2, df3])
    
    class TaskB(gokart.TaskOnKart):
    		def requires(self):
    			return TaskA()
    
        def run(self):
    				#Geladen nach dem Concated
            df =self.load_data_frame(required_columns={'target'})
            self.dump(df)

Nach Schlüssel angeben und laden

Wenn mehrere abhängige Aufgaben vorhanden sind, können Sie die abhängigen Aufgaben im Wörterbuchformat definieren und wie unten gezeigt mit dem Schlüssel lesen. Luigi allein kann mehrere Aufgaben laden, unterstützt jedoch keine Wörterbuchformulare. Daher kann die Verwendung des Wörterbuchformats die Lesbarkeit des Codes verbessern.


    class TrainModel(gokart.TaskOnKart):
        def requires(self):
            return {'data': LoadData(), 'target': LoadTarget()}
        
        def run(self):
            data = self.load('data')
            target = self.load('target')
            
            model = LogisticRegression()
            model.fit(data, target)
            
            self.dump(model)

Nacheinander laden

Sie können self.load_generator verwenden, um Aufgaben nacheinander zu laden und zu verarbeiten.


    from sklearn.datasets import load_iris
    from sklearn.datasets import load_wine
    
    
    class LoadWineData(gokart.TaskOnKart):
        def run(self):
            data = load_wine()['data']
            
            self.dump(data)
            
    class LoadIrisData(gokart.TaskOnKart):
        def run(self):
            data = load_iris()['data']
            
            self.dump(data)
    
    class LoadGenerator(gokart.TaskOnKart):
        def requires(self):
            return [LoadWineData(), LoadIrisData()]
        
        def run(self):
            for data in self.load_generator():
                print(f'data_shape={data.shape}')
                # data_shape=(178, 13)
                # data_shape=(150, 4)

output

Speichern ohne Hashwert

Wenn Sie "use_unique_id = False" festlegen, wird der Hashwert nicht an den Dateinamen angehängt.

    def output(self):
            return self.make_target('data.pkl', use_unique_id=False)

Speichern Sie ein Modell, das mehrere Dateien umfasst

Für Formate wie gensim und TensorFlow, in denen Modelle in mehreren Dateien gespeichert werden, können Sie "make_model_target" wie unten gezeigt verwenden, um sie alle gleichzeitig zu komprimieren und zu speichern.

    def output(self):
            return self.make_model_target(
                'model.zip', 
                save_function=gensim.model.Word2Vec.save,
                load_function=gensim.model.Word2Vec.load)

Durch Übergeben einer Funktion zum Speichern und Wiederherstellen als Parameter werden das Modell und load_function komprimiert und als Satz im Zip-Format (in diesem Fall) gespeichert, und die aufrufende Task ist besonders betroffen. Sie können das Modell mit self.load () wiederherstellen, ohne es zu müssen.

Speichern Sie einen riesigen DataFrame

Wenn Sie "make_large_data_frame_target" wie unten gezeigt verwenden, wird "DataFrame" für jede durch "max_byte" angegebene Kapazität in mehrere Datensätze unterteilt, in einen komprimiert und dann gespeichert.

    def output(self):
            return self.make_large_data_frame_target('large_df.zip', max_byte=2**10)

Das oben erwähnte "make_model_target" wird übrigens intern verwendet.

Speichern Sie DataFrame in verschiedenen Formaten

Wenn Sie "DataFrame" in ein anderes Format als "pickle" konvertieren und speichern möchten, fügen Sie einfach die Erweiterung dieses Formats hinzu, und der interne "FileProcessor" konvertiert es in das Zielformat und speichert es.

Derzeit unterstützte Formate sind

    - pickle
    - npz
    - gz
    - txt
    - csv
    - tsv
    - json
    - xml

ist.


    class LoadWineData(gokart.TaskOnKart):
        def run(self):
            data = load_wine()['data']
            
            self.dump(data)
            
    class ToCSV(gokart.TaskOnKart):
        def requires(self):
            return LoadWineData()
        
        def output(self):
            #Definieren Sie die Erweiterung, die Sie im Suffix speichern möchten
            return self.make_target('./wine.csv')
        
        def run(self):
            df = pd.DataFrame(self.load())
            self.dump(df)

Geben Sie das Speicherziel als GCS oder S3 an

Wenn der in der Einstellungsdatei beschriebene Pfad von work_space_directory mit gs: // beginnt, werden alle Ausgabeergebnisse in GCS hochgeladen, und wenn es s3: // `ist, werden alle Ausgabeergebnisse in S3 hochgeladen.

    [TaskOnKart]
    #Präfix gs://Oder s3://Dann werden alle Ausgaben in der Cloud gespeichert
    workspace_directory=gs://resources/

Dies ist sehr praktisch, da Sie den Code im Gegensatz zu Luigi allein ohne Änderungen ändern können.

Andere

Geben Sie den Parameter aus der Umgebungsvariablen an

Sie können den Task-Parameter in der Umgebungsvariablen angeben, indem Sie "parameter = $ {Umgebungsvariable}" in die Einstellungsdatei schreiben.

Dies ist sehr praktisch, wenn Sie den Test von der Produktion trennen möchten oder wenn Sie ändern möchten, ob für jede auszuführende Umgebung in der Cloud gespeichert werden soll.


    [TaskOnKart]
    workspace_directory=${WORK_SPACE}
    
    [feature.LoadTrainTask]
    is_test=${IS_TEST}

zshrc


    export IS_TEST=False
    datetime=`date "+%m%d%H%Y"`
    export WORK_SPACE="gs://data/output/${datetime}"

Persönlich möchte ich es zur Bestätigung vor Ort ein wenig drehen, bevor ich es mit GCE fest drehe, aber luigi.cfg ist sehr nützlich, weil ich ein allgemeines verwenden möchte.

Nehmen Sie eine Instanz von Task als Parameter

Wenn Sie gokart. (List) TaskInstanceParameter verwenden, können Sie Task als Parameter von Task verwenden. Auf diese Weise können Sie dieselbe Aufgabe wiederverwenden, indem Sie eine Aufgabe erstellen, die nicht von einer bestimmten Aufgabe abhängt. Dies erhöht die Möglichkeit, flexibleren Code zu schreiben.


    from sklearn.datasets import  load_wine
    from sklearn.linear_model import LogisticRegression
    
    
    class LoadWineData(gokart.TaskOnKart):
        def run(self):
            data = load_wine()['data']
            
            self.dump(data)
            
    class LoadWineTarget(gokart.TaskOnKart):
        def run(self):
            target = load_wine()['target']
            
            self.dump(target)
            
    
    class Trainer(gokart.TaskOnKart):
        #Nehmen Sie Aufgabe als Argument
        data_task = gokart.TaskInstanceParameter(description='data for train')
        target_task= gokart.TaskInstanceParameter(description='target for train')
        
        def requires(self):
            return {'data': self.data_task, 'target': self.target_task}
        
        def run(self):
            data = self.load('data')
            target = self.load('target')
            
            model = LogisticRegression()
            model.fit(data, target)
            
            self.dump(model)
    
            
    class ExcuteTrain(gokart.TaskOnKart):
        def requires(self):
            #Aufgabe injizieren
            return Trainer(data_task=LoadWineData(), target_task=LoadWineTarget())
        
        def run(self):
            trained_model = self.load()
            
            self.dump(trained_model)

Benachrichtigen Sie Slack

Es ist möglich, Slack zu benachrichtigen, indem Folgendes in die Einstellungsdatei geschrieben wird. Aus Sicherheitsgründen ist es besser, Token als Umgebungsvariable zu definieren, als solide zu schreiben.

    [SlackConfig]
    token=${SLACK_TOKEN}    
    channel=channel_name
    to_user=hase_hiro

Schließlich

Ich würde es begrüßen, wenn Sie auf Unterschiede im Verhalten hinweisen könnten.

Recommended Posts

Reibungslosere Pipeline-Verarbeitung mit Luigi! Gokart vorstellen
Datenpipeline-Aufbau mit Python und Luigi
Bildverarbeitung mit MyHDL
Datensätze mit Pandas verarbeiten (1)
Datensätze mit Pandas verarbeiten (2)
Parametereinstellung mit luigi (2)
Parametereinstellung mit luigi
Bildverarbeitung mit Python
Parallelverarbeitung mit Mehrfachverarbeitung
Bildverarbeitung mit PIL