In this article, based on my experience using Luigi, a workflow management framework that runs on Python, for data analysis, wouldn't it be easier to maintain a workflow like this? It describes the method that I thought. Some may find it inappropriate to say below, but it seems that these things haven't been discussed much in the first place, so I hope it will be a source of discussion. I will.
It doesn't touch on what tools Luigi is, what its advantages are, and what its basic usage is. Some of Qiita have already written an article, so please refer to that (Big data analysis using data flow control framework Luigi / 453aeec7f4f420b91241), Workflow management with Luigi, etc.).
Regarding the handling of Parameters, which are indispensable for controlling the operation of tasks.
Obviously, if you don't use Parameter, Luigi won't recognize the task you want to run with different settings as different. Also, to set an instance variable other than Parameter when loading a workflow, you need to write a \ _ \ _ init__ method, which makes the description unnecessary and complicated.
@ requirements
and @ inherits
decorators in situations such as those listed in this and the next section.
https://luigi.readthedocs.io/en/stable/api/luigi.util.htmlIn Luigi, when you instantiate a task that inherits a task as a dependent task, you must set all the Parameters defined in the class variables in the inherited classes. For example, suppose you define a task group as follows.
class Parent(luigi.ExternalTask):
hoge = luigi.Parameter()
class Child(Parent):
foo = luigi.Parameter()
#MissingParameterException is thrown
class All1(luigi.WrapperTask):
def requires(self):
yield Child(foo="foo")
#Will be executed
class All2(luigi.WrapperTask):
def requires(self):
yield Child(foo="foo", hoge="hoge")
All2 above is executable, but All1 trying to instantiate a Child with only foo set to a value will throw a MissingParameterException when trying to do it. In addition to Child's class variable foo, you must also set hoge, which is defined as a Parent's class variable. If so, it would be kinder to be explicit about what needs to be set as follows.
class Child(Parent):
foo = luigi.Parameter()
hoge = luigi.Parameter()
For example, in a task that outputs using csv.writer, consider a situation where you want to be able to flow calculations that change the behavior of csv.writer with keyword arguments. At this time, rather than having each keyword argument of csv.writer as a Parameter, it is clearer and more flexible to change it if it is stored in one Parameter as shown below.
class OutputCSV(luigi.Task):
csv_writer_kwargs = luigi.Parameter(default=dict(sep='\t'))
...
def run(self):
with open(self.output().fn, 'wb') as file:
writer = csv.writer(file, **self.csv_writer_kwargs)
...
For example, suppose you have a workflow in which TaskB and TaskC depend on TaskA. Now, if you're likely to create a workflow that uses TaskB or TaskC for another task, it's better to do something like Example 2 rather than Example 1 below.
Example 1
class TaskA(luigi.ExternalTask):
param1 = luigi.Parameter()
...
class TaskB(luigi.Task):
def requires(self):
yield TaskA(param1="hoge")
...
class TaskC(luigi.Task):
def requires(self):
yield TaskA(param1="hoge")
...
class All(luigi.WrapperTask):
def requires(self):
yield TaskB()
yield TaskC()
Example 2
class TaskA(luigi.ExternalTask):
param1 = luigi.Parameter()
...
class TaskB(luigi.Task):
required_task = luigi.Parameter(default=TaskA)
params = luigi.Parameter(default=dict(param1="hoge"))
def requires(self):
yield self.required_task(**self.params)
...
class TaskC(luigi.Task):
required_task = luigi.Parameter(default=TaskA)
params = luigi.Parameter(default=dict(param1="hoge"))
def requires(self):
yield self.required_task(**self.params)
...
class All(luigi.WrapperTask):
def requires(self):
yield TaskB()
yield TaskC()
Now suppose you want to add the process of executing TaskB and TaskC on the result of TaskA2. The following is a comparison of the necessary corrections for both.
Modification from Example 1
class TaskA(luigi.ExternalTask):
param1 = luigi.Parameter()
...
class TaskA2(luigi.ExternalTask):
param2 = luigi.Parameter()
...
class TaskB(luigi.Task):
def requires(self):
yield TaskA(param1="hoge")
...
class TaskC(luigi.Task):
def requires(self):
yield TaskA(param1="hoge")
...
class TaskB2(TaskB):
def requires(self):
yield TaskA2(param2="foo")
...
class TaskC2(TaskC):
def requires(self):
yield TaskA2(param2="foo")
...
class All(luigi.WrapperTask):
def requires(self):
yield TaskB()
yield TaskC()
yield TaskB2()
yield TaskC2()
Modification from Example 2
class TaskA(luigi.ExternalTask):
param1 = luigi.Parameter()
...
class TaskA2(luigi.ExternalTask):
param2 = luigi.Parameter()
...
class TaskB(luigi.Task):
required_task = luigi.Parameter(default=TaskA)
params = luigi.Parameter(default=dict(param1="hoge"))
def requires(self):
yield self.required_task(**self.params)
...
class TaskC(luigi.Task):
required_task = luigi.Parameter(default=TaskA)
params = luigi.Parameter(default=dict(param1="hoge"))
def requires(self):
yield self.required_task(**self.params)
...
class All(luigi.WrapperTask):
def requires(self):
yield TaskB()
yield TaskC()
yield TaskB(
required_task=A2,
params=dict(param2="foo"))
yield TaskC(
required_task=A2,
params=dict(param2="foo"))
In this way, if it is a form like Example 2, all you have to do is rewrite All, and you do not have to define something like TaskB2 or TaskC2 as in Example 1.
Everything below concerns the complete method. The default behavior of the complete method of the Task class is simply whether the existing method of the output Target returns True. If getting consistent output is a high priority, it's better to define a complete method that adheres to the following three points, rather than using the default complete method. However, it may not be suitable for some applications because it increases the number of situations where the workflow stops or is recalculated.
If you define that all the tasks included in the workflow call the complete method of the dependent task, when the complete task at the end of the workflow is called, complete is recursively called for the entire workflow. And if it finds a place where complete returns False, all the tasks downstream from it will be executed. By the way, the complete method of WrapperTask is exactly this "return True if all the return values of complete of the dependent task are True".
Make sure that the complete method returns False if the output date and time is earlier than the input date and time. This is so that after correcting a part of the intermediate result, the correction will be reflected in the entire necessary part.
If you get strange results in the middle of the workflow, you can stop at that point. Also, even if the result is obtained, the calculation can be performed by re-executing the workflow.
Recommended Posts