[PYTHON] Verbesserung der Wiederverwendbarkeit und Wartbarkeit von mit Luigi erstellten Workflows

Über diesen Artikel

Wäre es in diesem Artikel aufgrund meiner Erfahrung mit Luigi, einem auf Python ausgeführten Workflow-Management-Framework für die Datenanalyse, nicht einfacher, einen solchen Workflow zu verwalten? Es beschreibt die Methode, die ich dachte. Einige Leute mögen denken, dass es unangemessen ist, unten zu sagen, aber es scheint, dass solche Dinge überhaupt nicht viel diskutiert werden, also denke ich, dass es eine Quelle der Diskussion sein wird. Ich werde.

Es geht nicht darum, welche Tools Luigi ist, welche Vorteile es bietet und wie es grundlegend verwendet wird. Einige von Qiita haben bereits Artikel geschrieben. Lesen Sie sie daher bitte (Big-Data-Analyse mit dem Datenflusskontroll-Framework Luigi durchführen. / 453aeec7f4f420b91241), Workflow-Management mit Luigi usw.).

Verwendung von Parameter

In Bezug auf den Umgang mit Parametern, die für die Steuerung des Arbeitsablaufs unverzichtbar sind.

Verwenden Sie keine anderen Instanzvariablen als die als Parameter definierten

Wenn Sie keine Parameter verwenden, erkennt Luigi die Aufgaben, die Sie mit unterschiedlichen Einstellungen ausführen möchten, natürlich nicht als unterschiedlich. Darüber hinaus muss beim Laden des Workflows die Methode \ _ \ _ init__ geschrieben werden, um die andere Instanzvariable als Parameter festzulegen, wodurch die Beschreibung unnötig und kompliziert wird.

Überschreiben Sie alle Parameter der übergeordneten Klasse, wenn Sie eine Aufgabe erstellen, indem Sie die Aufgabe erben

In Luigi müssen beim Instanziieren einer Aufgabe, die eine Aufgabe als abhängige Aufgabe erbt, alle in den Klassenvariablen der geerbten Klassen definierten Parameter festgelegt werden. Angenommen, Sie definieren eine Aufgabengruppe wie folgt.

class Parent(luigi.ExternalTask):
    hoge = luigi.Parameter()


class Child(Parent):
    foo = luigi.Parameter()


#MissingParameterException wird ausgelöst
class All1(luigi.WrapperTask):
    def requires(self):
        yield Child(foo="foo")


#Wird durchgeführt
class All2(luigi.WrapperTask):
    def requires(self):
        yield Child(foo="foo", hoge="hoge")

All2 oben ist realisierbar, aber All1, der versucht, ein Kind zu instanziieren, wobei nur foo auf einen Wert gesetzt ist, löst eine MissingParameterException aus, wenn er dies versucht. Zusätzlich zur untergeordneten Klassenvariablen foo müssen Sie auch hoge festlegen, das als Klassenvariable eines Elternteils definiert ist. Wenn ja, wäre es freundlicher, explizit anzugeben, was wie folgt eingestellt werden muss.

class Child(Parent):
    foo = luigi.Parameter()
    hoge = luigi.Parameter()

Verwenden Sie komplizierte Parametergruppen in einem Diktat

Stellen Sie sich beispielsweise in einer Aufgabe, die mit csv.writer ausgegeben wird, eine Situation vor, in der Sie Berechnungen durchführen möchten, die das Verhalten von csv.writer mit Schlüsselwortargumenten ändern. Anstatt jedes Schlüsselwortargument von csv.writer als Parameter zu haben, ist es zu diesem Zeitpunkt klarer und flexibler, es zu ändern, wenn es in einem Parameter gespeichert ist, wie unten gezeigt.

class OutputCSV(luigi.Task):

    csv_writer_kwargs = luigi.Parameter(default=dict(sep='\t'))
    ...
    def run(self):
        with open(self.output().fn, 'wb') as file:
            writer = csv.writer(file, **self.csv_writer_kwargs)
        ...

Abhängige Aufgaben, die sich wahrscheinlich ändern, machen die abhängige Aufgabenklasse selbst zu einem Parameter

Angenommen, Sie haben einen Workflow, in dem TaskB und TaskC von TaskA abhängen. Wenn Sie jetzt wahrscheinlich einen Workflow erstellen, der TaskB oder TaskC für eine andere Aufgabe verwendet, ist es besser, Beispiel 2 als Beispiel 1 unten auszuführen.

Beispiel 1


class TaskA(luigi.ExternalTask):
    param1 = luigi.Parameter()
    ...

class TaskB(luigi.Task):

    def requires(self):
        yield TaskA(param1="hoge")
    ...

class TaskC(luigi.Task):

    def requires(self):
        yield TaskA(param1="hoge")
    ...

class All(luigi.WrapperTask):

    def requires(self):
        yield TaskB()
        yield TaskC()

Beispiel 2


class TaskA(luigi.ExternalTask):

    param1 = luigi.Parameter()
    ...

class TaskB(luigi.Task):

    required_task = luigi.Parameter(default=TaskA)
    params = luigi.Parameter(default=dict(param1="hoge"))

    def requires(self):
        yield self.required_task(**self.params)
    ...

class TaskC(luigi.Task):

    required_task = luigi.Parameter(default=TaskA)
    params = luigi.Parameter(default=dict(param1="hoge"))

    def requires(self):
        yield self.required_task(**self.params)
    ...

class All(luigi.WrapperTask):

    def requires(self):
        yield TaskB()
        yield TaskC()

Angenommen, Sie möchten den Prozess zum Ausführen von TaskB und TaskC zum Ergebnis von TaskA2 hinzufügen. Das Folgende ist ein Vergleich der notwendigen Korrekturen für beide.

Modifikation aus Beispiel 1


class TaskA(luigi.ExternalTask):

    param1 = luigi.Parameter()
    ...

class TaskA2(luigi.ExternalTask):

    param2 = luigi.Parameter()
    ...

class TaskB(luigi.Task):

    def requires(self):
        yield TaskA(param1="hoge")
    ...

class TaskC(luigi.Task):

    def requires(self):
        yield TaskA(param1="hoge")
    ...


class TaskB2(TaskB):

    def requires(self):
        yield TaskA2(param2="foo")
    ...

class TaskC2(TaskC):

    def requires(self):
        yield TaskA2(param2="foo")
    ...

class All(luigi.WrapperTask):

    def requires(self):
        yield TaskB()
        yield TaskC()
        yield TaskB2()
        yield TaskC2()

Modifikation aus Beispiel 2


class TaskA(luigi.ExternalTask):

    param1 = luigi.Parameter()
    ...

class TaskA2(luigi.ExternalTask):

    param2 = luigi.Parameter()
    ...

class TaskB(luigi.Task):

    required_task = luigi.Parameter(default=TaskA)
    params = luigi.Parameter(default=dict(param1="hoge"))

    def requires(self):
        yield self.required_task(**self.params)
    ...

class TaskC(luigi.Task):

    required_task = luigi.Parameter(default=TaskA)
    params = luigi.Parameter(default=dict(param1="hoge"))

    def requires(self):
        yield self.required_task(**self.params)
    ...

class All(luigi.WrapperTask):

    def requires(self):
        yield TaskB()
        yield TaskC()
        yield TaskB(
            required_task=A2,
            params=dict(param2="foo"))
        yield TaskC(
            required_task=A2,
            params=dict(param2="foo"))

Auf diese Weise müssen Sie, wenn es sich um die Form von Beispiel 2 handelt, lediglich Alle neu schreiben, und Sie müssen nicht wie in Beispiel 1 etwas wie TaskB2 oder TaskC2 definieren.

So halten Sie die Ergebnisse konsistent

Alles unten betrifft die vollständige Methode. Das Standardverhalten der vollständigen Methode der Task-Klasse ist einfach, ob die vorhandene Methode des Ausgabeziels True zurückgibt. Wenn das Erhalten einer konsistenten Ausgabe eine hohe Priorität hat, ist es besser, eine vollständige Methode zu definieren, die die folgenden drei Punkte einhält, als die standardmäßige vollständige Methode zu verwenden. Es ist jedoch möglicherweise für einige Anwendungen nicht geeignet, da es die Situation erhöht, in der der Workflow stoppt oder neu berechnet wird.

vollständige Überprüfungen, ob alle abhängigen Aufgaben abgeschlossen sind, geben True zurück

Wenn Sie definieren, dass alle im Workflow enthaltenen Aufgaben die vollständige Methode der abhängigen Aufgabe aufrufen, wird beim Aufrufen des Abschlusses der Aufgabe am Ende des Workflows der Abschluss für den gesamten Workflow rekursiv aufgerufen. Wenn ein Ort gefunden wird, an dem complete False zurückgibt, werden alle nachgeschalteten Aufgaben ausgeführt. Übrigens ist die vollständige Methode von WrapperTask genau die Operation "Rückgabe von True, wenn alle Rückgabewerte von complete der abhängigen Aufgabe True sind".

Die vollständige Methode überprüft die Eingabe- / Ausgabezeitstempel

Stellen Sie sicher, dass die vollständige Methode False zurückgibt, wenn Ausgabedatum und -zeit vor dem Eingabedatum und der Eingangszeit liegen. Dies ist so, dass nach der Korrektur eines Teils des Zwischenergebnisses die Korrektur im gesamten erforderlichen Teil wiedergegeben wird.

Geben Sie den Überprüfungsprozess des Berechnungsergebnisses in die vollständige Methode ein

Wenn Sie in der Mitte des Workflows seltsame Ergebnisse erzielen, können Sie an diesem Punkt anhalten. Selbst wenn das Ergebnis erhalten wird, kann die Berechnung durch erneutes Ausführen des Workflows durchgeführt werden.

Recommended Posts

Verbesserung der Wiederverwendbarkeit und Wartbarkeit von mit Luigi erstellten Workflows
Ich habe versucht, die Effizienz der täglichen Arbeit mit Python zu verbessern
10 Methoden zur Verbesserung der Genauigkeit von BERT
Verschieben Sie das, was Sie mit pip installiert haben, in die Conda-Umgebung
Verbesserung der Wiederverwendbarkeit und Wartbarkeit von mit Luigi erstellten Workflows
Instanziierung der zuvor erstellten BOX-Entwicklungsumgebung
Versuchen Sie, den Hintergrund und das sich bewegende Objekt des Videos mit OpenCV zu trennen
Skript zum Twittern mit Vielfachen von 3 und Zahlen mit 3 !!
Geben Sie die Start- und Endpositionen der Dateien an, die in qiitap enthalten sein sollen
Fügen Sie mit Matplotlib Informationen am unteren Rand der Abbildung hinzu
Extrahieren Sie Bilder und Tabellen mit Python aus PDF, um die Berichtslast zu verringern
Ich habe versucht, das Artikel-Update des Livedoor-Blogs mit Python und Selen zu automatisieren.
Visualisieren Sie den Bereich der internen und externen Einfügungen mit Python
Versuchen Sie, den Inhalt von Word mit Golang zu erhalten
Ich wollte nur die Daten des gewünschten Datums und der gewünschten Uhrzeit mit Django extrahieren
Ich habe versucht, die Verarbeitungsgeschwindigkeit mit dplyr von R und pandas von Python zu vergleichen
So geben Sie die Anzahl der ANSICHTEN, Likes und Bestände von Artikeln aus, die in Qiita an CSV gesendet wurden (erstellt mit "Python + Qiita API v2")
Ich habe versucht, zum Zeitpunkt der Bereitstellung mit Fabric und ChatWork Api automatisch in ChatWork zu posten
Geben Sie die Bilddaten mit Flask of Python zurück und zeichnen Sie sie in das Canvas-Element von HTML
Wie man einen bestimmten Prozess am Anfang und Ende der Spinne mit Scrapy einfügt
Ich habe versucht, die Entropie des Bildes mit Python zu finden
Mit den Daten von COVID-19 wurde ein Netzwerkdiagramm erstellt.
Sehen Sie, wie schnell Sie mit NumPy / SciPy beschleunigen können
Versuchen Sie, die Genauigkeit der Twitter-ähnlichen Zahlenschätzung zu verbessern
Schließen Sie den ersten Import des Moduls an und drucken Sie den Modulpfad
Versuchen Sie, den Betrieb von Netzwerkgeräten mit Python zu automatisieren
Ich möchte die Natur von Python und Pip kennenlernen
Befehle und Dateien zum Überprüfen der Version von CentOS Linux
Spielen Sie mit dem Passwortmechanismus von GitHub Webhook und Python
Holen Sie sich die Quelle der Seite unbegrenzt mit Python zu laden.
Versuchen Sie, Merkmale von Sensordaten mit CNN zu extrahieren
Über die Komponenten von Luigi
Verarbeiten Sie die mit Redshift entladene gzip-Datei mit Python of Lambda, gzipen Sie sie erneut und laden Sie sie in S3 hoch
Ich habe die Geschwindigkeit von Hash mit Topaz, Ruby und Python verglichen
Die Geschichte, dass man mit Pycharm kein Pygame spielen kann
Wiederholen Sie mit While. Skript zum Twittern oder Suchen vom Terminal aus
Speichern Sie das Ergebnis des Crawls mit Scrapy im Google Data Store
Machen Sie sich mit der Pipeline von spaCy vertraut (wollen Sie es sein)
Ich habe versucht, die Bewässerung des Pflanzgefäßes mit Raspberry Pi zu automatisieren
So erhalten Sie die ID von Type2Tag NXP NTAG213 mit nfcpy
[EC2] So installieren Sie Chrome und den Inhalt jedes Befehls
[Einführung in Python] Ich habe die Namenskonventionen von C # und Python verglichen.
[Python] So erhalten Sie den ersten und den letzten Tag des Monats
[Einführung in StyleGAN] Ich habe mit "The Life of a Man" ♬ gespielt
Versuchen Sie, das N Queen-Problem mit SA von PyQUBO zu lösen
Ich möchte den Anfang des nächsten Monats mit Python ausgeben
Korrespondenzanalyse von Sätzen mit COTOHA API und Speichern in Datei
Berücksichtigen Sie die Verarbeitungsgeschwindigkeit, um den Bildpuffer mit numpy.ndarray zu verschieben
Lösen des Labyrinths mit Python-Ergänzung zu Kapitel 6 der Algorithmus-Kurzreferenz-
So überwachen Sie den Ausführungsstatus von sqlldr mit dem Befehl pv
Rufen Sie die URL des von der Jira-Python-Bibliothek erstellten JIRA-Tickets ab
[Erforderliches Thema DI] Implementieren und verstehen Sie den Mechanismus von DI mit Go
Ich habe zusammengefasst, wie die Boot-Parameter von GRUB und GRUB2 geändert werden
Die stärkste Möglichkeit, MeCab und CaboCha mit Google Colab zu verwenden
Ich möchte die Position meines Gesichts mit OpenCV überprüfen!
Von der Einführung von JUMAN ++ bis zur morphologischen Analyse von Japanisch mit Python
Konvertieren Sie das Ergebnis von Python Optparse, um es zu diktieren und zu verwenden
PhytoMine-I hat versucht, mit Python die genetischen Informationen der Pflanze zu erhalten
Ich habe versucht, den Unterschied zwischen Config vor und nach der Arbeit mit pyATS / Genie selbst erstelltem Skript zu berücksichtigen