Wäre es in diesem Artikel aufgrund meiner Erfahrung mit Luigi, einem auf Python ausgeführten Workflow-Management-Framework für die Datenanalyse, nicht einfacher, einen solchen Workflow zu verwalten? Es beschreibt die Methode, die ich dachte. Einige Leute mögen denken, dass es unangemessen ist, unten zu sagen, aber es scheint, dass solche Dinge überhaupt nicht viel diskutiert werden, also denke ich, dass es eine Quelle der Diskussion sein wird. Ich werde.
Es geht nicht darum, welche Tools Luigi ist, welche Vorteile es bietet und wie es grundlegend verwendet wird. Einige von Qiita haben bereits Artikel geschrieben. Lesen Sie sie daher bitte (Big-Data-Analyse mit dem Datenflusskontroll-Framework Luigi durchführen. / 453aeec7f4f420b91241), Workflow-Management mit Luigi usw.).
In Bezug auf den Umgang mit Parametern, die für die Steuerung des Arbeitsablaufs unverzichtbar sind.
Wenn Sie keine Parameter verwenden, erkennt Luigi die Aufgaben, die Sie mit unterschiedlichen Einstellungen ausführen möchten, natürlich nicht als unterschiedlich. Darüber hinaus muss beim Laden des Workflows die Methode \ _ \ _ init__ geschrieben werden, um die andere Instanzvariable als Parameter festzulegen, wodurch die Beschreibung unnötig und kompliziert wird.
In Luigi müssen beim Instanziieren einer Aufgabe, die eine Aufgabe als abhängige Aufgabe erbt, alle in den Klassenvariablen der geerbten Klassen definierten Parameter festgelegt werden. Angenommen, Sie definieren eine Aufgabengruppe wie folgt.
class Parent(luigi.ExternalTask):
hoge = luigi.Parameter()
class Child(Parent):
foo = luigi.Parameter()
#MissingParameterException wird ausgelöst
class All1(luigi.WrapperTask):
def requires(self):
yield Child(foo="foo")
#Wird durchgeführt
class All2(luigi.WrapperTask):
def requires(self):
yield Child(foo="foo", hoge="hoge")
All2 oben ist realisierbar, aber All1, der versucht, ein Kind zu instanziieren, wobei nur foo auf einen Wert gesetzt ist, löst eine MissingParameterException aus, wenn er dies versucht. Zusätzlich zur untergeordneten Klassenvariablen foo müssen Sie auch hoge festlegen, das als Klassenvariable eines Elternteils definiert ist. Wenn ja, wäre es freundlicher, explizit anzugeben, was wie folgt eingestellt werden muss.
class Child(Parent):
foo = luigi.Parameter()
hoge = luigi.Parameter()
Stellen Sie sich beispielsweise in einer Aufgabe, die mit csv.writer ausgegeben wird, eine Situation vor, in der Sie Berechnungen durchführen möchten, die das Verhalten von csv.writer mit Schlüsselwortargumenten ändern. Anstatt jedes Schlüsselwortargument von csv.writer als Parameter zu haben, ist es zu diesem Zeitpunkt klarer und flexibler, es zu ändern, wenn es in einem Parameter gespeichert ist, wie unten gezeigt.
class OutputCSV(luigi.Task):
csv_writer_kwargs = luigi.Parameter(default=dict(sep='\t'))
...
def run(self):
with open(self.output().fn, 'wb') as file:
writer = csv.writer(file, **self.csv_writer_kwargs)
...
Angenommen, Sie haben einen Workflow, in dem TaskB und TaskC von TaskA abhängen. Wenn Sie jetzt wahrscheinlich einen Workflow erstellen, der TaskB oder TaskC für eine andere Aufgabe verwendet, ist es besser, Beispiel 2 als Beispiel 1 unten auszuführen.
Beispiel 1
class TaskA(luigi.ExternalTask):
param1 = luigi.Parameter()
...
class TaskB(luigi.Task):
def requires(self):
yield TaskA(param1="hoge")
...
class TaskC(luigi.Task):
def requires(self):
yield TaskA(param1="hoge")
...
class All(luigi.WrapperTask):
def requires(self):
yield TaskB()
yield TaskC()
Beispiel 2
class TaskA(luigi.ExternalTask):
param1 = luigi.Parameter()
...
class TaskB(luigi.Task):
required_task = luigi.Parameter(default=TaskA)
params = luigi.Parameter(default=dict(param1="hoge"))
def requires(self):
yield self.required_task(**self.params)
...
class TaskC(luigi.Task):
required_task = luigi.Parameter(default=TaskA)
params = luigi.Parameter(default=dict(param1="hoge"))
def requires(self):
yield self.required_task(**self.params)
...
class All(luigi.WrapperTask):
def requires(self):
yield TaskB()
yield TaskC()
Angenommen, Sie möchten den Prozess zum Ausführen von TaskB und TaskC zum Ergebnis von TaskA2 hinzufügen. Das Folgende ist ein Vergleich der notwendigen Korrekturen für beide.
Modifikation aus Beispiel 1
class TaskA(luigi.ExternalTask):
param1 = luigi.Parameter()
...
class TaskA2(luigi.ExternalTask):
param2 = luigi.Parameter()
...
class TaskB(luigi.Task):
def requires(self):
yield TaskA(param1="hoge")
...
class TaskC(luigi.Task):
def requires(self):
yield TaskA(param1="hoge")
...
class TaskB2(TaskB):
def requires(self):
yield TaskA2(param2="foo")
...
class TaskC2(TaskC):
def requires(self):
yield TaskA2(param2="foo")
...
class All(luigi.WrapperTask):
def requires(self):
yield TaskB()
yield TaskC()
yield TaskB2()
yield TaskC2()
Modifikation aus Beispiel 2
class TaskA(luigi.ExternalTask):
param1 = luigi.Parameter()
...
class TaskA2(luigi.ExternalTask):
param2 = luigi.Parameter()
...
class TaskB(luigi.Task):
required_task = luigi.Parameter(default=TaskA)
params = luigi.Parameter(default=dict(param1="hoge"))
def requires(self):
yield self.required_task(**self.params)
...
class TaskC(luigi.Task):
required_task = luigi.Parameter(default=TaskA)
params = luigi.Parameter(default=dict(param1="hoge"))
def requires(self):
yield self.required_task(**self.params)
...
class All(luigi.WrapperTask):
def requires(self):
yield TaskB()
yield TaskC()
yield TaskB(
required_task=A2,
params=dict(param2="foo"))
yield TaskC(
required_task=A2,
params=dict(param2="foo"))
Auf diese Weise müssen Sie, wenn es sich um die Form von Beispiel 2 handelt, lediglich Alle neu schreiben, und Sie müssen nicht wie in Beispiel 1 etwas wie TaskB2 oder TaskC2 definieren.
Alles unten betrifft die vollständige Methode. Das Standardverhalten der vollständigen Methode der Task-Klasse ist einfach, ob die vorhandene Methode des Ausgabeziels True zurückgibt. Wenn das Erhalten einer konsistenten Ausgabe eine hohe Priorität hat, ist es besser, eine vollständige Methode zu definieren, die die folgenden drei Punkte einhält, als die standardmäßige vollständige Methode zu verwenden. Es ist jedoch möglicherweise für einige Anwendungen nicht geeignet, da es die Situation erhöht, in der der Workflow stoppt oder neu berechnet wird.
Wenn Sie definieren, dass alle im Workflow enthaltenen Aufgaben die vollständige Methode der abhängigen Aufgabe aufrufen, wird beim Aufrufen des Abschlusses der Aufgabe am Ende des Workflows der Abschluss für den gesamten Workflow rekursiv aufgerufen. Wenn ein Ort gefunden wird, an dem complete False zurückgibt, werden alle nachgeschalteten Aufgaben ausgeführt. Übrigens ist die vollständige Methode von WrapperTask genau die Operation "Rückgabe von True, wenn alle Rückgabewerte von complete der abhängigen Aufgabe True sind".
Stellen Sie sicher, dass die vollständige Methode False zurückgibt, wenn Ausgabedatum und -zeit vor dem Eingabedatum und der Eingangszeit liegen. Dies ist so, dass nach der Korrektur eines Teils des Zwischenergebnisses die Korrektur im gesamten erforderlichen Teil wiedergegeben wird.
Wenn Sie in der Mitte des Workflows seltsame Ergebnisse erzielen, können Sie an diesem Punkt anhalten. Selbst wenn das Ergebnis erhalten wird, kann die Berechnung durch erneutes Ausführen des Workflows durchgeführt werden.
Recommended Posts