[PYTHON] Pour améliorer la réutilisabilité et la maintenabilité des flux de travail créés avec Luigi

À propos de cet article

Dans cet article, basé sur mon expérience avec Luigi, un framework de gestion de flux de travail qui s'exécute sur Python, pour l'analyse de données, ne serait-il pas plus facile de maintenir un flux de travail comme celui-ci? Il décrit la méthode que je pensais. Certaines personnes peuvent penser qu'il est inapproprié de dire ci-dessous, mais il semble que de telles choses ne sont pas beaucoup discutées en premier lieu, donc je pense que ce sera une source de discussion. Je vais.

Il ne concerne pas les outils de Luigi, ses avantages et son utilisation de base. Certains de Qiita ont déjà écrit des articles, alors veuillez vous y référer (Effectuer une analyse de Big Data en utilisant le framework de contrôle de flux de données Luigi. / 453aeec7f4f420b91241), Gestion des flux de travail avec Luigi, etc.).

Comment utiliser le paramètre

Concernant la gestion des paramètres, indispensables pour contrôler le fonctionnement des tâches.

N'utilisez pas de variables d'instance autres que celles définies comme paramètre

De toute évidence, si vous n'utilisez pas Parameter, Luigi ne reconnaîtra pas la tâche que vous souhaitez exécuter avec différents paramètres comme étant différente. De plus, il est nécessaire d'écrire la méthode \ _ \ _ init__ pour définir la variable d'instance autre que Parameter lors du chargement du workflow, ce qui rend la description inutile et compliquée.

Remplacer tous les paramètres de la classe parente lors de la création d'une tâche en héritant de la tâche

Dans Luigi, lors de l'instanciation d'une tâche qui hérite d'une tâche en tant que tâche dépendante, tous les paramètres définis dans les variables de classe dans les classes héritées doivent être définis. Par exemple, supposons que vous définissiez un groupe de tâches comme suit.

class Parent(luigi.ExternalTask):
    hoge = luigi.Parameter()


class Child(Parent):
    foo = luigi.Parameter()


#MissingParameterException est levée
class All1(luigi.WrapperTask):
    def requires(self):
        yield Child(foo="foo")


#Sera exécuté
class All2(luigi.WrapperTask):
    def requires(self):
        yield Child(foo="foo", hoge="hoge")

All2 ci-dessus est viable, mais All1 essayant d'instancier un Child avec seulement foo set une valeur lève une MissingParameterException en essayant de le faire. En plus de la variable de classe enfant foo, vous devez également définir hoge, qui est définie comme une variable de classe Parent. Si tel est le cas, il serait plus gentil d'être explicite sur ce qui doit être défini comme suit.

class Child(Parent):
    foo = luigi.Parameter()
    hoge = luigi.Parameter()

Utiliser des groupes de paramètres complexes dans un dict

Par exemple, dans une tâche qui génère à l'aide de csv.writer, envisagez une situation dans laquelle vous souhaitez pouvoir exécuter des calculs qui modifient le comportement de csv.writer avec des arguments de mot-clé. Pour le moment, plutôt que d'avoir chaque argument mot-clé de csv.writer comme paramètre, il est plus clair et plus flexible de le modifier s'il est stocké dans un paramètre comme indiqué ci-dessous.

class OutputCSV(luigi.Task):

    csv_writer_kwargs = luigi.Parameter(default=dict(sep='\t'))
    ...
    def run(self):
        with open(self.output().fn, 'wb') as file:
            writer = csv.writer(file, **self.csv_writer_kwargs)
        ...

Les tâches dépendantes susceptibles de changer font de la classe de tâches dépendante elle-même un paramètre

Par exemple, supposons que vous ayez un flux de travail dans lequel TaskB et TaskC dépendent de TaskA. Maintenant, si vous êtes susceptible de créer un flux de travail qui utilise TaskB ou TaskC pour une autre tâche, il est préférable de faire quelque chose comme l'exemple 2 plutôt que l'exemple 1 ci-dessous.

Exemple 1


class TaskA(luigi.ExternalTask):
    param1 = luigi.Parameter()
    ...

class TaskB(luigi.Task):

    def requires(self):
        yield TaskA(param1="hoge")
    ...

class TaskC(luigi.Task):

    def requires(self):
        yield TaskA(param1="hoge")
    ...

class All(luigi.WrapperTask):

    def requires(self):
        yield TaskB()
        yield TaskC()

Exemple 2


class TaskA(luigi.ExternalTask):

    param1 = luigi.Parameter()
    ...

class TaskB(luigi.Task):

    required_task = luigi.Parameter(default=TaskA)
    params = luigi.Parameter(default=dict(param1="hoge"))

    def requires(self):
        yield self.required_task(**self.params)
    ...

class TaskC(luigi.Task):

    required_task = luigi.Parameter(default=TaskA)
    params = luigi.Parameter(default=dict(param1="hoge"))

    def requires(self):
        yield self.required_task(**self.params)
    ...

class All(luigi.WrapperTask):

    def requires(self):
        yield TaskB()
        yield TaskC()

Supposons maintenant que vous souhaitiez ajouter le processus d'exécution de TaskB et TaskC sur le résultat de TaskA2. Ce qui suit est une comparaison des corrections nécessaires pour les deux.

Modification de l'exemple 1


class TaskA(luigi.ExternalTask):

    param1 = luigi.Parameter()
    ...

class TaskA2(luigi.ExternalTask):

    param2 = luigi.Parameter()
    ...

class TaskB(luigi.Task):

    def requires(self):
        yield TaskA(param1="hoge")
    ...

class TaskC(luigi.Task):

    def requires(self):
        yield TaskA(param1="hoge")
    ...


class TaskB2(TaskB):

    def requires(self):
        yield TaskA2(param2="foo")
    ...

class TaskC2(TaskC):

    def requires(self):
        yield TaskA2(param2="foo")
    ...

class All(luigi.WrapperTask):

    def requires(self):
        yield TaskB()
        yield TaskC()
        yield TaskB2()
        yield TaskC2()

Modification de l'exemple 2


class TaskA(luigi.ExternalTask):

    param1 = luigi.Parameter()
    ...

class TaskA2(luigi.ExternalTask):

    param2 = luigi.Parameter()
    ...

class TaskB(luigi.Task):

    required_task = luigi.Parameter(default=TaskA)
    params = luigi.Parameter(default=dict(param1="hoge"))

    def requires(self):
        yield self.required_task(**self.params)
    ...

class TaskC(luigi.Task):

    required_task = luigi.Parameter(default=TaskA)
    params = luigi.Parameter(default=dict(param1="hoge"))

    def requires(self):
        yield self.required_task(**self.params)
    ...

class All(luigi.WrapperTask):

    def requires(self):
        yield TaskB()
        yield TaskC()
        yield TaskB(
            required_task=A2,
            params=dict(param2="foo"))
        yield TaskC(
            required_task=A2,
            params=dict(param2="foo"))

De cette façon, s'il a une forme comme l'exemple 2, tout ce que vous avez à faire est de réécrire Tout, et vous n'avez pas à définir quelque chose comme TaskB2 ou TaskC2 comme dans l'exemple 1.

Comment garder les résultats cohérents

Tout ce qui suit concerne la méthode complète. Le comportement par défaut de la méthode complète de la classe Task est simplement de savoir si la méthode existante de la cible de sortie renvoie True. Si l'obtention d'une sortie cohérente est une priorité élevée, il est préférable de définir une méthode complète qui respecte les trois points suivants, plutôt que d'utiliser la méthode complète par défaut. Cependant, il peut ne pas convenir à certaines applications car il augmente la situation dans laquelle le flux de travail est arrêté ou recalculé.

vérifie si toutes les tâches dépendantes se terminent, renvoie Vrai

Si vous définissez que toutes les tâches incluses dans le flux de travail appellent la méthode complète de la tâche dépendante, lorsque l'achèvement de la tâche à la fin du flux de travail est appelé, complete est appelé de manière récursive pour l'ensemble du flux de travail. S'il trouve un endroit où complete renvoie False, toutes les tâches en aval seront exécutées. En passant, la méthode complète de WrapperTask est exactement l'opération de "renvoyer True si toutes les valeurs de retour de complete de la tâche dépendante sont True".

La méthode complète vérifie les horodatages d'entrée / sortie

Assurez-vous que la méthode complète renvoie False si la date et l'heure de sortie sont antérieures à la date et l'heure d'entrée. Ainsi, après avoir corrigé une partie du résultat intermédiaire, la correction sera reflétée dans toute la partie nécessaire.

Mettez le processus de vérification du résultat du calcul dans la méthode complète

Si vous obtenez des résultats étranges au milieu du flux de travail, vous pouvez vous arrêter à ce stade. De plus, même si le résultat est obtenu, le calcul peut être effectué en réexécutant le workflow.

Recommended Posts

Pour améliorer la réutilisabilité et la maintenabilité des flux de travail créés avec Luigi
J'ai essayé d'améliorer l'efficacité du travail quotidien avec Python
10 méthodes pour améliorer la précision de BERT
Déplacez ce que vous avez installé avec pip dans l'environnement conda
Pour améliorer la réutilisabilité et la maintenabilité des flux de travail créés avec Luigi
Instanciation de l'environnement de développement BOX créé précédemment
Essayez de séparer l'arrière-plan et l'objet en mouvement de la vidéo avec OpenCV
Script pour tweeter avec des multiples de 3 et des nombres avec 3 !!
Spécifiez les positions de début et de fin des fichiers à inclure avec qiitap
Ajoutez des informations au bas de la figure avec Matplotlib
Extraire des images et des tableaux de pdf avec python pour réduire la charge de reporting
J'ai essayé d'automatiser la mise à jour de l'article du blog Livedoor avec Python et sélénium.
Visualisez la gamme d'insertions internes et externes avec python
Essayez d'obtenir le contenu de Word avec Golang
Je voulais juste extraire les données de la date et de l'heure souhaitées avec Django
J'ai essayé de comparer la vitesse de traitement avec dplyr de R et pandas de Python
Comment générer le nombre de vues, de likes et de stocks d'articles publiés sur Qiita au format CSV (créé avec "Python + Qiita API v2")
J'ai essayé de publier automatiquement sur ChatWork au moment du déploiement avec Fabric et ChatWork Api
Renvoyez les données d'image avec Flask of Python et dessinez-les dans l'élément canvas de HTML
Comment insérer un processus spécifique au début et à la fin de l'araignée avec la tremblante
J'ai essayé de trouver l'entropie de l'image avec python
Un diagramme de réseau a été créé avec les données du COVID-19.
Découvrez la puissance de l'accélération avec NumPy / SciPy
Essayez d'améliorer la précision de l'estimation du nombre de Twitter
Accrocher à la première importation du module et imprimer le chemin du module
Essayez d'automatiser le fonctionnement des périphériques réseau avec Python
Je veux connaître la nature de Python et pip
Commandes et fichiers pour vérifier la version de CentOS Linux
Jouez avec le mécanisme de mot de passe de GitHub Webhook et Python
Récupérez la source de la page à charger indéfiniment avec python.
Essayez d'extraire les caractéristiques des données de capteur avec CNN
À propos des composants de Luigi
Traitez le fichier gzip UNLOADed avec Redshift avec Python de Lambda, gzipez-le à nouveau et téléchargez-le sur S3
J'ai comparé la vitesse de Hash avec Topaz, Ruby et Python
L'histoire de ne pas pouvoir exécuter pygame avec pycharm
Répétez avec While. Script pour tweeter ou rechercher depuis le terminal
Enregistrez le résultat de l'exploration avec Scrapy dans Google Data Store
Familiarisez-vous avec (voulez être) autour du pipeline de spaCy
J'ai essayé d'automatiser l'arrosage du pot avec Raspberry Pi
Comment obtenir l'ID de Type2Tag NXP NTAG213 avec nfcpy
[EC2] Comment installer Chrome et le contenu de chaque commande
[Introduction à Python] J'ai comparé les conventions de nommage de C # et Python.
[Python] Comment obtenir le premier et le dernier jour du mois
[Introduction à StyleGAN] J'ai joué avec "The Life of a Man" ♬
Essayez de résoudre le problème N Queen avec SA de PyQUBO
Je veux sortir le début du mois prochain avec Python
Analyse de correspondance des phrases avec l'API COTOHA et sauvegarde dans un fichier
Considérez la vitesse de traitement pour déplacer le tampon d'image avec numpy.ndarray
Résolution du labyrinthe avec Python-Supplément au chapitre 6 de la référence rapide de l'algorithme-
Comment surveiller l'état d'exécution de sqlldr avec la commande pv
Obtenir l'URL du ticket JIRA créé par la bibliothèque jira-python
[Objet obligatoire DI] Implémenter et comprendre le mécanisme de DI avec Go
J'ai résumé comment changer les paramètres de démarrage de GRUB et GRUB2
La meilleure façon d'utiliser MeCab et CaboCha avec Google Colab
Je veux vérifier la position de mon visage avec OpenCV!
De l'introduction de JUMAN ++ à l'analyse morphologique du japonais avec Python
Convertissez le résultat de python optparse en dict et utilisez-le
PhytoMine-I a essayé d'obtenir les informations génétiques de la plante avec Python
J'ai essayé de faire la différence de Config avant et après le travail avec le script pyATS / Genie self-made