Dans cet article, basé sur mon expérience avec Luigi, un framework de gestion de flux de travail qui s'exécute sur Python, pour l'analyse de données, ne serait-il pas plus facile de maintenir un flux de travail comme celui-ci? Il décrit la méthode que je pensais. Certaines personnes peuvent penser qu'il est inapproprié de dire ci-dessous, mais il semble que de telles choses ne sont pas beaucoup discutées en premier lieu, donc je pense que ce sera une source de discussion. Je vais.
Il ne concerne pas les outils de Luigi, ses avantages et son utilisation de base. Certains de Qiita ont déjà écrit des articles, alors veuillez vous y référer (Effectuer une analyse de Big Data en utilisant le framework de contrôle de flux de données Luigi. / 453aeec7f4f420b91241), Gestion des flux de travail avec Luigi, etc.).
Concernant la gestion des paramètres, indispensables pour contrôler le fonctionnement des tâches.
De toute évidence, si vous n'utilisez pas Parameter, Luigi ne reconnaîtra pas la tâche que vous souhaitez exécuter avec différents paramètres comme étant différente. De plus, il est nécessaire d'écrire la méthode \ _ \ _ init__ pour définir la variable d'instance autre que Parameter lors du chargement du workflow, ce qui rend la description inutile et compliquée.
@ requirements
et @ inherits
dans des situations telles que celles répertoriées dans cette section et la suivante.
https://luigi.readthedocs.io/en/stable/api/luigi.util.htmlDans Luigi, lors de l'instanciation d'une tâche qui hérite d'une tâche en tant que tâche dépendante, tous les paramètres définis dans les variables de classe dans les classes héritées doivent être définis. Par exemple, supposons que vous définissiez un groupe de tâches comme suit.
class Parent(luigi.ExternalTask):
hoge = luigi.Parameter()
class Child(Parent):
foo = luigi.Parameter()
#MissingParameterException est levée
class All1(luigi.WrapperTask):
def requires(self):
yield Child(foo="foo")
#Sera exécuté
class All2(luigi.WrapperTask):
def requires(self):
yield Child(foo="foo", hoge="hoge")
All2 ci-dessus est viable, mais All1 essayant d'instancier un Child avec seulement foo set une valeur lève une MissingParameterException en essayant de le faire. En plus de la variable de classe enfant foo, vous devez également définir hoge, qui est définie comme une variable de classe Parent. Si tel est le cas, il serait plus gentil d'être explicite sur ce qui doit être défini comme suit.
class Child(Parent):
foo = luigi.Parameter()
hoge = luigi.Parameter()
Par exemple, dans une tâche qui génère à l'aide de csv.writer, envisagez une situation dans laquelle vous souhaitez pouvoir exécuter des calculs qui modifient le comportement de csv.writer avec des arguments de mot-clé. Pour le moment, plutôt que d'avoir chaque argument mot-clé de csv.writer comme paramètre, il est plus clair et plus flexible de le modifier s'il est stocké dans un paramètre comme indiqué ci-dessous.
class OutputCSV(luigi.Task):
csv_writer_kwargs = luigi.Parameter(default=dict(sep='\t'))
...
def run(self):
with open(self.output().fn, 'wb') as file:
writer = csv.writer(file, **self.csv_writer_kwargs)
...
Par exemple, supposons que vous ayez un flux de travail dans lequel TaskB et TaskC dépendent de TaskA. Maintenant, si vous êtes susceptible de créer un flux de travail qui utilise TaskB ou TaskC pour une autre tâche, il est préférable de faire quelque chose comme l'exemple 2 plutôt que l'exemple 1 ci-dessous.
Exemple 1
class TaskA(luigi.ExternalTask):
param1 = luigi.Parameter()
...
class TaskB(luigi.Task):
def requires(self):
yield TaskA(param1="hoge")
...
class TaskC(luigi.Task):
def requires(self):
yield TaskA(param1="hoge")
...
class All(luigi.WrapperTask):
def requires(self):
yield TaskB()
yield TaskC()
Exemple 2
class TaskA(luigi.ExternalTask):
param1 = luigi.Parameter()
...
class TaskB(luigi.Task):
required_task = luigi.Parameter(default=TaskA)
params = luigi.Parameter(default=dict(param1="hoge"))
def requires(self):
yield self.required_task(**self.params)
...
class TaskC(luigi.Task):
required_task = luigi.Parameter(default=TaskA)
params = luigi.Parameter(default=dict(param1="hoge"))
def requires(self):
yield self.required_task(**self.params)
...
class All(luigi.WrapperTask):
def requires(self):
yield TaskB()
yield TaskC()
Supposons maintenant que vous souhaitiez ajouter le processus d'exécution de TaskB et TaskC sur le résultat de TaskA2. Ce qui suit est une comparaison des corrections nécessaires pour les deux.
Modification de l'exemple 1
class TaskA(luigi.ExternalTask):
param1 = luigi.Parameter()
...
class TaskA2(luigi.ExternalTask):
param2 = luigi.Parameter()
...
class TaskB(luigi.Task):
def requires(self):
yield TaskA(param1="hoge")
...
class TaskC(luigi.Task):
def requires(self):
yield TaskA(param1="hoge")
...
class TaskB2(TaskB):
def requires(self):
yield TaskA2(param2="foo")
...
class TaskC2(TaskC):
def requires(self):
yield TaskA2(param2="foo")
...
class All(luigi.WrapperTask):
def requires(self):
yield TaskB()
yield TaskC()
yield TaskB2()
yield TaskC2()
Modification de l'exemple 2
class TaskA(luigi.ExternalTask):
param1 = luigi.Parameter()
...
class TaskA2(luigi.ExternalTask):
param2 = luigi.Parameter()
...
class TaskB(luigi.Task):
required_task = luigi.Parameter(default=TaskA)
params = luigi.Parameter(default=dict(param1="hoge"))
def requires(self):
yield self.required_task(**self.params)
...
class TaskC(luigi.Task):
required_task = luigi.Parameter(default=TaskA)
params = luigi.Parameter(default=dict(param1="hoge"))
def requires(self):
yield self.required_task(**self.params)
...
class All(luigi.WrapperTask):
def requires(self):
yield TaskB()
yield TaskC()
yield TaskB(
required_task=A2,
params=dict(param2="foo"))
yield TaskC(
required_task=A2,
params=dict(param2="foo"))
De cette façon, s'il a une forme comme l'exemple 2, tout ce que vous avez à faire est de réécrire Tout, et vous n'avez pas à définir quelque chose comme TaskB2 ou TaskC2 comme dans l'exemple 1.
Tout ce qui suit concerne la méthode complète. Le comportement par défaut de la méthode complète de la classe Task est simplement de savoir si la méthode existante de la cible de sortie renvoie True. Si l'obtention d'une sortie cohérente est une priorité élevée, il est préférable de définir une méthode complète qui respecte les trois points suivants, plutôt que d'utiliser la méthode complète par défaut. Cependant, il peut ne pas convenir à certaines applications car il augmente la situation dans laquelle le flux de travail est arrêté ou recalculé.
Si vous définissez que toutes les tâches incluses dans le flux de travail appellent la méthode complète de la tâche dépendante, lorsque l'achèvement de la tâche à la fin du flux de travail est appelé, complete est appelé de manière récursive pour l'ensemble du flux de travail. S'il trouve un endroit où complete renvoie False, toutes les tâches en aval seront exécutées. En passant, la méthode complète de WrapperTask est exactement l'opération de "renvoyer True si toutes les valeurs de retour de complete de la tâche dépendante sont True".
Assurez-vous que la méthode complète renvoie False si la date et l'heure de sortie sont antérieures à la date et l'heure d'entrée. Ainsi, après avoir corrigé une partie du résultat intermédiaire, la correction sera reflétée dans toute la partie nécessaire.
Si vous obtenez des résultats étranges au milieu du flux de travail, vous pouvez vous arrêter à ce stade. De plus, même si le résultat est obtenu, le calcul peut être effectué en réexécutant le workflow.
Recommended Posts