Datenpipeline-Aufbau mit Python und Luigi

Einführung

Dieser Artikel ist eine japanische Übersetzung von Erstellen von Datenpipelines mit Python und Luigi. Der ursprüngliche Artikel war gut gemacht, also habe ich versucht, ihn zu übersetzen, obwohl ich mir nicht sicher war, für mein eigenes Verständnis. Wenn Sie Fehler haben, teilen Sie uns dies bitte in den Kommentaren mit.

Datenpipeline-Aufbau mit Python und Luigi

Für Datenwissenschaftler ist der tägliche Betrieb oft mehr Forschung und Entwicklung als das Engineering. Der Prozess vom Prototyp zum Produkt erfordert jedoch einiges an Umbauaufwand, wobei schnelle und schlammige Entscheidungen die nächstbeste Option sind ^ 1. Dies verzögert immer die Innovation und im Allgemeinen das gesamte Projekt.

In diesem Artikel werden die Erfahrungen beim Erstellen einer Datenpipeline erläutert: Alle allgemeinen Schritte, die zum Aufbereiten von Daten für ein datengesteuertes Produkt erforderlich sind, z. B. Datenextraktion, Bereinigung, Zusammenführung und Vorverarbeitung. Besonderes Augenmerk liegt auf ** Daten-Piping ** und darauf, wie ein Workflow-Manager wie Luigi ein Retter sein kann, ohne Sie zu stören. Mit minimalem Aufwand wird der Übergang vom Prototyp zum Produkt reibungslos verlaufen.

Ein Beispielcode ist unter [GitHub Gist] verfügbar (https://gist.github.com/bonzanini/40774bf9348d35d9ea4f).

Prototyp bisher

In früheren Prototypen sah die Datenpipeline ungefähr so aus:

$ python get_some_data.py
$ python clean_some_data.py
$ python join_other_data.py
$ python do_stuff_with_data.py

In der vorbereitenden experimentellen Phase eines Datenprojekts sind folgende Dinge häufig anzutreffen: Eine Vorverarbeitung ist erforderlich, was wahrscheinlich zu einem schnellen Hack führt ^ 2, sodass es von Best Practices für das Engineering geplagt wird. Und die Anzahl der Skripte schwillt an und die Datenpipeline wird gelöscht.

Dieser Ansatz hat nur den Vorteil, schnell und hackig zu sein. Der Nachteil ist, dass es langweilig ist: Sie möchten die Pipeline jedes Mal neu ausführen und müssen eine Reihe von Skripten nacheinander manuell aufrufen. Darüber hinaus gibt es viele Missverständnisse, wenn dieser Prototyp mit Kollegen geteilt wird (z. B. "Warum funktioniert" do_stuff_with_data "nicht?", "Haben Sie zuerst" clean_some_data "ausgeführt?" Usw.).

Die scheinbar hackige Lösung scheint alles in einem Skript zusammenzufassen. Nach einigem schnellen Refactoring würde das Skript "do_everything.py" folgendermaßen aussehen:

if __name__ == '__main__':
    get_some_data()
    clean_some_data()
    join_other_data()
    do_stuff_with_data()

Leicht zu schaffen:

$ python do_everything.py

(Hinweis: Sie können alles in einem Bash-Skript zusammenfassen, das eine Reihe von Skripten nacheinander aufruft, aber die Nachteile bleiben gleich.)

Codevorlage

Wenn wir in die produktreife Pipeline übergehen, müssen wir ein wenig über die Aspekte des Codes nachdenken, die alle Beispiele ausführen. Insbesondere sollte die Fehlerbehandlung berücksichtigt werden:

try:
    get_some_data()
except GetSomeDataError as e:
    #Fehlerbehandlung

Aber wenn alle Aufgaben zusammengestellt sind, wird es zu einem Versuch / außer Weihnachtsbaum:

try:
    get_some_data()
    try:
        clean_some_data()
        try:
            #Mach hier etwas...
        except EvenMoreErrors:
            # ...
    except CleanSomeDataError as e:
        #Behandeln Sie CleanSomeDataError
except GetSomeDataError as e:
    #Behandeln Sie GetSomeDataError

Ein weiterer wichtiger Aspekt ist die Wiederherstellung der Pipeline. Wenn beispielsweise die ersten Aufgaben abgeschlossen sind, aber unterwegs ein Fehler auftritt, wie können Sie die Pipeline erneut ausführen, ohne den ersten erfolgreichen Schritt erneut auszuführen?

#Überprüfen Sie, ob die Aufgabe bereits erfolgreich ist
if not i_got_the_data_already():
    #Wenn nicht, mach es
    try:
        get_some_date()
    except GetSomeDataError as e:
        #Fehlerbehandlung

Zu Luigi

Luigi ist ein Python-Tool für das Workflow-Management, das von Spotify entwickelt wurde, um komplexe Daten-Pipelines für Batch-Jobs zu erstellen. Die Installation von Luigi ist:

pip install luigi

Die nützlichen Funktionen von Luigi sind:

Es gibt zwei Schlüsselkonzepte, um zu verstehen, wie Luigi auf die Datenpipeline angewendet werden kann: Aufgaben und Ziele. Eine Aufgabe ist eine Sammlung von Aufgaben, die durch Erben der Klasse "luigi.Task" und Überschreiben einiger grundlegender Methoden dargestellt werden. Die Ausgabe der Aufgabe ist das Ziel, bei dem es sich möglicherweise um das lokale Dateisystem, Amazon S3 oder die Datenbank handelt.

Abhängigkeiten können von Ein- und Ausgängen definiert werden. Wenn beispielsweise Aufgabe B von Aufgabe A abhängt, bedeutet dies, dass die Ausgabe von Aufgabe A die Eingabe von Aufgabe B ist.

Schauen wir uns einige typische Aufgaben an:

# Filename: run_luigi.py
import luigi
 
class PrintNumbers(luigi.Task):
 
    def requires(self):
        return []
 
    def output(self):
        return luigi.LocalTarget("numbers_up_to_10.txt")
 
    def run(self):
        with self.output().open('w') as f:
            for i in range(1, 11):
                f.write("{}\n".format(i))
 
class SquaredNumbers(luigi.Task):
 
    def requires(self):
        return [PrintNumbers()]
 
    def output(self):
        return luigi.LocalTarget("squares.txt")
 
    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                n = int(line.strip())
                out = n * n
                fout.write("{}:{}\n".format(n, out))
                 
if __name__ == '__main__':
    luigi.run()

Dieser Code enthält zwei Aufgaben: "PrintNumbers", mit denen Zahlen von 1 bis 10 zeilenweise in eine Datei mit dem Namen "number_up_to_10.txt" geschrieben werden, und "squares", die diese Datei lesen und zeilenweise mit quadratischen Zahlen paaren. Es ist eine "Quadratische Zahl", die in eine Datei namens ".txt" schreibt.

So führen Sie diese Aufgabe aus:

$ python run_luigi.py SquaredNumbers --local-scheduler

Luigi erwägt die Abhängigkeitsprüfung zwischen Aufgaben und stellt fest, dass keine "SquaredNumbers" -Eingabe vorhanden ist. Führen Sie daher zuerst die "PrintNumbers" -Aufgabe und dann die "SquaredNumbers" aus.

Das erste Argument, das Sie an Luigi übergeben, ist der Name der letzten Aufgabe in der Pipeline, die Sie ausführen möchten. Das zweite Argument weist Luigi einfach an, den lokalen Scheduler zu verwenden (dazu später mehr).

Sie können auch den Befehl luigi verwenden:

$ luigi -m run_luigi.py SquaredNumbers --local-scheduler

Aufgabenskelett

Um eine Luigi-Aufgabe zu erstellen, erstellen Sie einfach eine Klasse mit "luigi.Task" als übergeordnetem Element und überschreiben Sie einige Methoden. Insbesondere:

--requires () ist eine Liste abhängiger Aufgaben --output () ist das Ziel der Aufgabe (z. B. LocalTarget, S3Target usw.) --run () ist die Ausführungslogik

Ist. Luigi überprüft die Rückgabewerte von "require ()" und "output ()" und erstellt entsprechend einen Abhängigkeitsgraphen.

Parameter übergeben

Fest codierte Dateinamen und Einstellungen sind im Allgemeinen Anti-Patterns. Sobald Sie die Struktur und Dynamik einer Aufgabe verstanden haben, sollten Sie Ihre Einstellungen so parametrisieren, dass Sie dasselbe Skript dynamisch mit unterschiedlichen Argumenten aufrufen können.

Das ist die Klasse luigi.Parameter (). Jede Luigi-Aufgabe kann mehrere Parameter haben. Nehmen wir zum Beispiel im vorherigen Beispiel an, dass Sie die Nummer ändern können. Da wir Ganzzahlen als Parameter für die Funktion range () verwenden, können wir anstelle der Standardparameterklasse luigi.IntParameter verwenden. Die geänderte Aufgabe sieht folgendermaßen aus:

class PrintNumbers(luigi.Task):
    n = luigi.IntParameter()
 
    def requires(self):
        return []
 
    def output(self):
        return luigi.LocalTarget("numbers_up_to_{}.txt".format(self.n))
 
    def run(self):
        with self.output().open('w') as f:
            for i in range(1, self.n+1):
                f.write("{}\n".format(i))
 
class SquaredNumbers(luigi.Task):
    n = luigi.IntParameter()
 
    def requires(self):
        return [PrintNumbers(n=self.n)]
 
    def output(self):
        return luigi.LocalTarget("squares_up_to_{}.txt".format(self.n))
 
    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                n = int(line.strip())
                out = n * n
                fout.write("{}:{}\n".format(n, out))

So erhöhen Sie die Aufgabe "SquaredNumbers" auf 20 und rufen sie auf:

$ python run_luigi.py SquaredNumbers --local-scheduler --n 20

Parameter können auch Standardwerte haben. Zum Beispiel:

n = luigi.IntParameter(default=10)

In diesem Fall wird 10 verwendet, sofern nicht das Argument "--n" angegeben ist.

Beispiel für GitHub Gist

Lokaler vs globaler Planer

Früher habe ich die Option "--local-scheduler" verwendet, als ich Luigis Aufgaben auf dem lokalen Scheduler ausführte. Dies ist nützlich für die Entwicklung, aber für Produktumgebungen sollten Sie einen zentralen Scheduler verwenden (siehe Dokumentation unter Scheduler).

Dies hat mehrere Vorteile:

So führen Sie den Luigi-Scheduler-Daemon im Vordergrund aus:

$ luigid

Im Hintergrund:

$ luigid --background

Standardmäßig wird Port 8082 verwendet, sodass Sie die Visualisierung anzeigen können, indem Sie mit Ihrem Browser auf http: // localhost: 8082 / zugreifen.

Wenn der globale Luigi-Scheduler ausgeführt wird, kann er ohne Optionen für den lokalen Scheduler erneut ausgeführt werden:

$ python run_luigi.py SquaredNumbers --n [BIG_NUMBER]

Der Beispielcode endet in Millisekunden. Wenn Sie jedoch zum Browser wechseln und das Abhängigkeitsdiagramm anzeigen möchten, während die Aufgabe noch ausgeführt wird, möchten Sie wahrscheinlich eine große Anzahl, z. B. 10.000.000 oder mehr, für die Option "--n". Du solltest es geben.

Ein Screenshot des Abhängigkeitsgraphen ist: dependency-graph-screenshot.png

Zusammenfassung

Wir haben die Definition einer Datenpipeline mit Luigi, einem in Python geschriebenen Workflow-Manager, besprochen. Luigi bietet eine schöne Abstraktion der Pipeline mit Aufgaben und Zielen und berücksichtigt auch Abhängigkeiten für Sie.

Aus der Perspektive der Wiederverwendung von Code und der Einstellung, vom Prototyp zum Produkt zu wechseln, habe ich individuelle Python-Pakete für Geschäftslogikaufgaben. Ich finde es nützlich, es als -and-Distribute-Python-Pakete /) zu definieren (dh mit der Datei setup.py). Auf diese Weise können Sie einfach "import your_package" aus Ihrem Luigi-Skript deklarieren und von dort aus aufrufen.

Es ist möglich, dass eine Aufgabe mehrere Dateien als Ausgabe generiert. In diesem Fall sollten Sie jedoch wahrscheinlich überlegen, ob die Aufgabe in kleinere Einheiten (dh mehrere Aufgaben) unterteilt werden kann. Sind ihre Ausgaben logisch gleich? Gibt es Abhängigkeiten? Wenn Sie eine Aufgabe nicht aufteilen können, ist es meiner Meinung nach einfach und bequem, output () einfach zu einer Protokolldatei zu machen, die den Namen der Aufgabe selbst, den Zeitstempel usw. kombiniert. Der Name der Protokolldatei lautet ungefähr "TaskName_timestamp_param1value_param2value_etc".

Workflow-Manager wie Luigi behandeln Abhängigkeiten, reduzieren die Anzahl der Codevorlagen für die Parameter- und Fehlerbehandlung, verwalten die Fehlerbehebung und folgen klaren Mustern bei der Entwicklung von Datenpipelines. Im Allgemeinen ist es nützlich, weil es tut.

Es ist wichtig, auch die Grenzen zu berücksichtigen:

--Lugi wurde für Batch-Jobs entwickelt und ist daher wahrscheinlich für die Verarbeitung in nahezu Echtzeit nutzlos


Recommended Posts

Datenpipeline-Aufbau mit Python und Luigi
Fröhliche GUI-Konstruktion mit Elektron und Python
Datenanalyse mit Python 2
Datenanalyse mit Python
Mit Python erstellte Beispieldaten
Programmieren mit Python und Tkinter
Ver- und Entschlüsselung mit Python
Python und Hardware-Verwenden von RS232C mit Python-
Holen Sie sich Youtube-Daten mit Python
Aufbau einer Datenanalyseumgebung mit Python (IPython Notebook + Pandas)
Python-Umgebungskonstruktion und TensorFlow
Untersuchen Sie den Java- und Python-Datenaustausch mit Apache Arrow
Python mit Pyenv und Venv
Funktioniert mit Python und R.
Lesen von JSON-Daten mit Python
Befreien Sie sich mit Python und regulären Ausdrücken von schmutzigen Daten
Löse das Spiralbuch (Algorithmus und Datenstruktur) mit Python!
Holen Sie sich zusätzliche Daten zu LDAP mit Python (Writer und Reader)
Kommunizieren Sie mit FX-5204PS mit Python und PyUSB
Umgebungskonstruktion von Python und OpenCV
Leuchtendes Leben mit Python und OpenCV
Beginnen Sie mit Python! ~ ① Umweltbau ~
Roboter läuft mit Arduino und Python
Installieren Sie Python 2.7.9 und Python 3.4.x mit pip.
Neuronales Netzwerk mit OpenCV 3 und Python 3
AM-Modulation und Demodulation mit Python
Scraping mit Node, Ruby und Python
Scraping mit Python, Selen und Chromedriver
Kratzen mit Python und schöner Suppe
[Python] Mit DataReader Wirtschaftsdaten abrufen
JSON-Codierung und -Decodierung mit Python
Hadoop-Einführung und MapReduce mit Python
[GUI in Python] PyQt5-Drag & Drop-
Python-Datenstruktur mit Chemoinfomatik gelernt
Hashing von Daten in R und Python
Lesen und Schreiben von NetCDF mit Python
Ich habe mit PyQt5 und Python3 gespielt
Python3-Umgebungskonstruktion mit pyenv-virtualenv (CentOS 7.3)
Visualisieren Sie Ihre Daten ganz einfach mit Python Seaborn.
Lesen und Schreiben von CSV mit Python
Mehrfachintegration mit Python und Sympy
Verarbeiten Sie Pubmed .xml-Daten mit Python
pytorch @ python3.8 Umgebungskonstruktion mit pipenv
Datenanalyse beginnend mit Python (Datenvisualisierung 1)
Koexistenz von Python2 und 3 mit CircleCI (1.0)
Datenanalyse beginnend mit Python (Datenvisualisierung 2)
Anwendung von Python: Datenbereinigung Teil 2: Datenbereinigung mit DataFrame
Sugoroku-Spiel und Zusatzspiel mit Python
FM-Modulation und Demodulation mit Python
Erstellen einer Umgebung mit pyenv und pyenv-virtualenv
Holen Sie sich Daten von VPS MySQL mit Python 3 und SQL Alchemy
Verschieben Sie Daten mit Python Change / Delete (Writer und Reader) nach LDAP.
Aufbau einer LaTeX- und R-Umgebung (ein wenig Python) mit SublimeText3 (Windows)
Kommunizieren Sie mit gRPC zwischen Elixir und Python
Holen Sie sich mit Python zusätzliche Daten zu LDAP
[Ubuntu 18.04] Erstellen Sie eine Python-Umgebung mit pyenv + pipenv
Berechnen Sie das Standardgewicht und zeigen Sie es mit Python an
Empfangen Sie Textdaten von MySQL mit Python