[PYTHON] Traitement des pipelines plus fluide avec Luigi! Présentation de gokart

Qu'est-ce que c'est?

Ceci est un article de synthèse sur la bibliothèque de wrapper de Luigi gokart.

Les motifs de développement et d'utilisation de base sont résumés très attentivement dans le blog de M3, et l'utilisation de base est C'est une histoire que vous devriez lire ici, mais je voulais la résumer comme une référence inversée, alors j'en ai fait un article.

De plus, je n'expliquerai pas grand chose sur les fonctions de Luigi lui-même.

Qu'est-ce que Luigi?

Un type d'OSS pour le framework Pipeline développé par Spotify. Implémenté en Python, héritant de luigi.Task

--requires () : Tâche dépendante --run () : Processus à exécuter --ʻOutput () `: Destination de sortie

Vous pouvez facilement créer un flux de travail en écrivant simplement les trois méthodes.

L'origine du nom

Also it should be mentioned that Luigi is named after the pipeline-running friend of Super Mario.

Apparemment

Qu'est-ce que gokart?

Gokart est une bibliothèque de wrapper qui rend Luigi plus facile à utiliser.

L'origine du nom est probablement Mario (Kart).

De base

Les fonctions de gokart == 0.3.6 sont résumées ci-dessous.

Construire une tâche

Lors de la création d'une tâche, héritez de gokart.TaskOnKart au lieu de luigi.Task.

    import gokart
    
    class TaskA(gokart.TaskOnKart):
        def run(self):
            data= pd.DataFrame(load_iris()['data'])
            self.dump(data)
    
    class TaskB(gokart.TaskOnKart):
        def reuires(self):
             return TaskA()
        
    		#la sortie est facultative
        def output(self):
            return self.make_target('data.pkl')
            
        def run(self):
            df =self.load()
        
            self.dump(df)

L'utilisation de base est la même que Luigi, mais vous n'avez qu'à faire self.dump (objet que vous voulez sauvegarder), donc cela peut être beaucoup plus simple que de faire le même traitement avec Luigi seul. De plus, la sortie def (self) peut être omise, auquel cas elle sera sauvegardée au format pickle.

Courir

Exécutez comme suit.

    gokart.run(['TaskB', '--local-scheduler'])

Une fois exécuté, l'objet sera enregistré sous resources comme indiqué ci-dessous.


    resources
    ├── data_3eba828ec57403111694b0fb1e3f62e0.pkl
    └── log
        ├── module_versions
        │   └── TaskB_3eba828ec57403111694b0fb1e3f62e0.txt
        ├── processing_time
        │   └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
        ├── task_log
        │   └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
        └── task_params
            └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl

La valeur de hachage est attachée au nom de fichier spécifié et enregistrée. La valeur de hachage étant déterminée par les paramètres de la tâche, la tâche sera réexécutée si les paramètres sont modifiés, contrairement au cas de ** Luigi seul. ** C'est également l'un des avantages du gokart. (Bien que cela soit décrit plus tard, il est également possible de sauvegarder sans ajouter de valeur de hachage)

le journal a

--Version du module utilisé --Temps de traitement --log sortie par enregistreur

Est sauvé.

Spécification du PATH de destination de sauvegarde

Par défaut, il est enregistré sous resources, mais le répertoire de destination de sauvegarde se trouve dans le fichier de paramètres.


    [TaskOnKart]
    workspace_directory=./output

Il peut être modifié en spécifiant comme.

load

Combinez DataFrames puis chargez

Bien sûr, vous pouvez également charger un DataFrame enregistré avec self.load (), mais si vous voulez charger un ensemble de DataFrames comme [df1, df2, df3 ...], vous pouvez utiliser load_dataframe. Vous pouvez charger plusieurs DataFrames dans un état combiné verticalement.

Vous pouvez également éventuellement spécifier une colonne avec set pour lever une exception si cette colonne n'existe pas dans le DataFrame à charger.


    class TaskA(gokart.TaskOnKart):
        def run(self):
            df1 = pd.DataFrame([1,2], columns=['target'])
            df2 = pd.DataFrame([3,4], columns=['target'])
            df3 = pd.DataFrame([5,6], columns=['target'])
            self.dump([df1, df2, df3])
    
    class TaskB(gokart.TaskOnKart):
    		def requires(self):
    			return TaskA()
    
        def run(self):
    				#Chargé après avoir été concaté
            df =self.load_data_frame(required_columns={'target'})
            self.dump(df)

Spécifiez par clé et charge

S'il existe plusieurs tâches dépendantes, vous pouvez définir les tâches dépendantes au format dictionnaire et les lire avec la clé comme indiqué ci-dessous. Luigi seul peut charger plusieurs tâches, mais il ne prend pas en charge les formulaires de dictionnaire, donc l'utilisation du format de dictionnaire peut améliorer la lisibilité du code.


    class TrainModel(gokart.TaskOnKart):
        def requires(self):
            return {'data': LoadData(), 'target': LoadTarget()}
        
        def run(self):
            data = self.load('data')
            target = self.load('target')
            
            model = LogisticRegression()
            model.fit(data, target)
            
            self.dump(model)

Charger séquentiellement

Vous pouvez utiliser self.load_generator pour charger et traiter les tâches de manière séquentielle.


    from sklearn.datasets import load_iris
    from sklearn.datasets import load_wine
    
    
    class LoadWineData(gokart.TaskOnKart):
        def run(self):
            data = load_wine()['data']
            
            self.dump(data)
            
    class LoadIrisData(gokart.TaskOnKart):
        def run(self):
            data = load_iris()['data']
            
            self.dump(data)
    
    class LoadGenerator(gokart.TaskOnKart):
        def requires(self):
            return [LoadWineData(), LoadIrisData()]
        
        def run(self):
            for data in self.load_generator():
                print(f'data_shape={data.shape}')
                # data_shape=(178, 13)
                # data_shape=(150, 4)

output

Enregistrer sans valeur de hachage

Si ʻuse_unique_id = False`, la valeur de hachage ne sera pas attachée au nom du fichier.

    def output(self):
            return self.make_target('data.pkl', use_unique_id=False)

Enregistrer un modèle qui s'étend sur plusieurs fichiers

Pour les formats tels que gensim et TensorFlow où les modèles sont enregistrés dans plusieurs fichiers, vous pouvez utiliser make_model_target comme indiqué ci-dessous pour les compresser et les enregistrer tous en même temps.

    def output(self):
            return self.make_model_target(
                'model.zip', 
                save_function=gensim.model.Word2Vec.save,
                load_function=gensim.model.Word2Vec.load)

En passant une fonction de sauvegarde / restauration comme paramètre, le modèle et load_function sont compressés et sauvegardés sous forme d'ensemble au format zip (dans ce cas), et la tâche appelante est particulièrement concernée. Vous pouvez restaurer le modèle avec self.load () sans avoir à le faire.

Enregistrez un énorme DataFrame

Si vous utilisez make_large_data_frame_target comme indiqué ci-dessous, DataFrame sera divisé en plusieurs enregistrements pour chaque capacité spécifiée par max_byte, compressé en un, puis sauvegardé.

    def output(self):
            return self.make_large_data_frame_target('large_df.zip', max_byte=2**10)

À propos, le make_model_target mentionné ci-dessus est utilisé en interne.

Enregistrer DataFrame dans différents formats

Si vous voulez convertir DataFrame dans un format autre que pickle et l'enregistrer, ajoutez simplement l'extension de ce format et le FileProcessor interne le convertira au format cible et l'enregistrera.

Les formats actuellement pris en charge sont

    - pickle
    - npz
    - gz
    - txt
    - csv
    - tsv
    - json
    - xml

est.


    class LoadWineData(gokart.TaskOnKart):
        def run(self):
            data = load_wine()['data']
            
            self.dump(data)
            
    class ToCSV(gokart.TaskOnKart):
        def requires(self):
            return LoadWineData()
        
        def output(self):
            #Définissez l'extension que vous souhaitez enregistrer dans le suffixe
            return self.make_target('./wine.csv')
        
        def run(self):
            df = pd.DataFrame(self.load())
            self.dump(df)

Spécifiez la destination de sauvegarde comme GCS ou S3

Si le chemin de work_space_directory décrit dans le fichier de configuration commence par gs: //, tous les résultats de sortie seront téléchargés vers GCS, et s'il s'agit de s3: //, tous les résultats de sortie seront téléchargés vers S3.

    [TaskOnKart]
    #préfixe gs://Ou s3://Ensuite, toutes les sorties sont stockées dans le cloud
    workspace_directory=gs://resources/

C'est très pratique car vous pouvez changer le code sans aucune modification contrairement au cas de Luigi seul.

Autre

Spécifier le paramètre à partir de la variable d'environnement

Vous pouvez spécifier le paramètre de tâche dans la variable d'environnement en écrivant paramètre = $ {variable d'environnement} dans le fichier de configuration.

Ceci est très pratique lorsque vous souhaitez séparer le test de la production, ou lorsque vous souhaitez modifier la sauvegarde dans le cloud pour chaque environnement à exécuter.


    [TaskOnKart]
    workspace_directory=${WORK_SPACE}
    
    [feature.LoadTrainTask]
    is_test=${IS_TEST}

-. zshrc


    export IS_TEST=False
    datetime=`date "+%m%d%H%Y"`
    export WORK_SPACE="gs://data/output/${datetime}"

Personnellement, je voudrais le tourner un peu pour confirmation localement avant de le tourner étroitement avec GCE, mais luigi.cfg est très utile car je veux en utiliser un commun.

Prendre une instance de Task comme paramètre

Si vous utilisez gokart. (List) TaskInstanceParameter, vous pouvez prendre Task comme paramètre de Task. Cela vous permet de réutiliser la même tâche en créant une tâche qui ne dépend pas d'une tâche spécifique, augmentant ainsi la possibilité d'écrire un code plus flexible.


    from sklearn.datasets import  load_wine
    from sklearn.linear_model import LogisticRegression
    
    
    class LoadWineData(gokart.TaskOnKart):
        def run(self):
            data = load_wine()['data']
            
            self.dump(data)
            
    class LoadWineTarget(gokart.TaskOnKart):
        def run(self):
            target = load_wine()['target']
            
            self.dump(target)
            
    
    class Trainer(gokart.TaskOnKart):
        #Prenez Task comme argument
        data_task = gokart.TaskInstanceParameter(description='data for train')
        target_task= gokart.TaskInstanceParameter(description='target for train')
        
        def requires(self):
            return {'data': self.data_task, 'target': self.target_task}
        
        def run(self):
            data = self.load('data')
            target = self.load('target')
            
            model = LogisticRegression()
            model.fit(data, target)
            
            self.dump(model)
    
            
    class ExcuteTrain(gokart.TaskOnKart):
        def requires(self):
            #Injecter la tâche
            return Trainer(data_task=LoadWineData(), target_task=LoadWineTarget())
        
        def run(self):
            trained_model = self.load()
            
            self.dump(trained_model)

Notifier Slack

Il est possible de notifier slack en écrivant ce qui suit dans le fichier de paramètres. Du point de vue de la sécurité, il est préférable de définir le jeton comme une variable d'environnement au lieu d'une écriture solide.

    [SlackConfig]
    token=${SLACK_TOKEN}    
    channel=channel_name
    to_user=hase_hiro

finalement

Je vous serais reconnaissant si vous pouviez signaler toute différence de comportement.

Recommended Posts

Traitement des pipelines plus fluide avec Luigi! Présentation de gokart
Construction de pipeline de données avec Python et Luigi
Traitement d'image avec MyHDL
Traitement des ensembles de données avec des pandas (1)
Traitement des ensembles de données avec des pandas (2)
Réglage des paramètres avec luigi (2)
Réglage des paramètres avec luigi
Traitement d'image avec Python
Traitement parallèle avec multitraitement
Traitement d'image avec PIL