Dieser Artikel ist eine japanische Übersetzung von Erstellen von Datenpipelines mit Python und Luigi. Der ursprüngliche Artikel war gut gemacht, also habe ich versucht, ihn zu übersetzen, obwohl ich mir nicht sicher war, für mein eigenes Verständnis. Wenn Sie Fehler haben, teilen Sie uns dies bitte in den Kommentaren mit.
Für Datenwissenschaftler ist der tägliche Betrieb oft mehr Forschung und Entwicklung als das Engineering. Der Prozess vom Prototyp zum Produkt erfordert jedoch einiges an Umbauaufwand, wobei schnelle und schlammige Entscheidungen die nächstbeste Option sind ^ 1. Dies verzögert immer die Innovation und im Allgemeinen das gesamte Projekt.
In diesem Artikel werden die Erfahrungen beim Erstellen einer Datenpipeline erläutert: Alle allgemeinen Schritte, die zum Aufbereiten von Daten für ein datengesteuertes Produkt erforderlich sind, z. B. Datenextraktion, Bereinigung, Zusammenführung und Vorverarbeitung. Besonderes Augenmerk liegt auf ** Daten-Piping ** und darauf, wie ein Workflow-Manager wie Luigi ein Retter sein kann, ohne Sie zu stören. Mit minimalem Aufwand wird der Übergang vom Prototyp zum Produkt reibungslos verlaufen.
Ein Beispielcode ist unter [GitHub Gist] verfügbar (https://gist.github.com/bonzanini/40774bf9348d35d9ea4f).
In früheren Prototypen sah die Datenpipeline ungefähr so aus:
$ python get_some_data.py
$ python clean_some_data.py
$ python join_other_data.py
$ python do_stuff_with_data.py
In der vorbereitenden experimentellen Phase eines Datenprojekts sind folgende Dinge häufig anzutreffen: Eine Vorverarbeitung ist erforderlich, was wahrscheinlich zu einem schnellen Hack führt ^ 2, sodass es von Best Practices für das Engineering geplagt wird. Und die Anzahl der Skripte schwillt an und die Datenpipeline wird gelöscht.
Dieser Ansatz hat nur den Vorteil, schnell und hackig zu sein. Der Nachteil ist, dass es langweilig ist: Sie möchten die Pipeline jedes Mal neu ausführen und müssen eine Reihe von Skripten nacheinander manuell aufrufen. Darüber hinaus gibt es viele Missverständnisse, wenn dieser Prototyp mit Kollegen geteilt wird (z. B. "Warum funktioniert" do_stuff_with_data "nicht?", "Haben Sie zuerst" clean_some_data "ausgeführt?" Usw.).
Die scheinbar hackige Lösung scheint alles in einem Skript zusammenzufassen. Nach einigem schnellen Refactoring würde das Skript "do_everything.py" folgendermaßen aussehen:
if __name__ == '__main__':
get_some_data()
clean_some_data()
join_other_data()
do_stuff_with_data()
Leicht zu schaffen:
$ python do_everything.py
(Hinweis: Sie können alles in einem Bash-Skript zusammenfassen, das eine Reihe von Skripten nacheinander aufruft, aber die Nachteile bleiben gleich.)
Wenn wir in die produktreife Pipeline übergehen, müssen wir ein wenig über die Aspekte des Codes nachdenken, die alle Beispiele ausführen. Insbesondere sollte die Fehlerbehandlung berücksichtigt werden:
try:
get_some_data()
except GetSomeDataError as e:
#Fehlerbehandlung
Aber wenn alle Aufgaben zusammengestellt sind, wird es zu einem Versuch / außer Weihnachtsbaum:
try:
get_some_data()
try:
clean_some_data()
try:
#Mach hier etwas...
except EvenMoreErrors:
# ...
except CleanSomeDataError as e:
#Behandeln Sie CleanSomeDataError
except GetSomeDataError as e:
#Behandeln Sie GetSomeDataError
Ein weiterer wichtiger Aspekt ist die Wiederherstellung der Pipeline. Wenn beispielsweise die ersten Aufgaben abgeschlossen sind, aber unterwegs ein Fehler auftritt, wie können Sie die Pipeline erneut ausführen, ohne den ersten erfolgreichen Schritt erneut auszuführen?
#Überprüfen Sie, ob die Aufgabe bereits erfolgreich ist
if not i_got_the_data_already():
#Wenn nicht, mach es
try:
get_some_date()
except GetSomeDataError as e:
#Fehlerbehandlung
Luigi ist ein Python-Tool für das Workflow-Management, das von Spotify entwickelt wurde, um komplexe Daten-Pipelines für Batch-Jobs zu erstellen. Die Installation von Luigi ist:
pip install luigi
Die nützlichen Funktionen von Luigi sind:
Es gibt zwei Schlüsselkonzepte, um zu verstehen, wie Luigi auf die Datenpipeline angewendet werden kann: Aufgaben und Ziele. Eine Aufgabe ist eine Sammlung von Aufgaben, die durch Erben der Klasse "luigi.Task" und Überschreiben einiger grundlegender Methoden dargestellt werden. Die Ausgabe der Aufgabe ist das Ziel, bei dem es sich möglicherweise um das lokale Dateisystem, Amazon S3 oder die Datenbank handelt.
Abhängigkeiten können von Ein- und Ausgängen definiert werden. Wenn beispielsweise Aufgabe B von Aufgabe A abhängt, bedeutet dies, dass die Ausgabe von Aufgabe A die Eingabe von Aufgabe B ist.
Schauen wir uns einige typische Aufgaben an:
# Filename: run_luigi.py
import luigi
class PrintNumbers(luigi.Task):
def requires(self):
return []
def output(self):
return luigi.LocalTarget("numbers_up_to_10.txt")
def run(self):
with self.output().open('w') as f:
for i in range(1, 11):
f.write("{}\n".format(i))
class SquaredNumbers(luigi.Task):
def requires(self):
return [PrintNumbers()]
def output(self):
return luigi.LocalTarget("squares.txt")
def run(self):
with self.input()[0].open() as fin, self.output().open('w') as fout:
for line in fin:
n = int(line.strip())
out = n * n
fout.write("{}:{}\n".format(n, out))
if __name__ == '__main__':
luigi.run()
Dieser Code enthält zwei Aufgaben: "PrintNumbers", mit denen Zahlen von 1 bis 10 zeilenweise in eine Datei mit dem Namen "number_up_to_10.txt" geschrieben werden, und "squares", die diese Datei lesen und zeilenweise mit quadratischen Zahlen paaren. Es ist eine "Quadratische Zahl", die in eine Datei namens ".txt" schreibt.
So führen Sie diese Aufgabe aus:
$ python run_luigi.py SquaredNumbers --local-scheduler
Luigi erwägt die Abhängigkeitsprüfung zwischen Aufgaben und stellt fest, dass keine "SquaredNumbers" -Eingabe vorhanden ist. Führen Sie daher zuerst die "PrintNumbers" -Aufgabe und dann die "SquaredNumbers" aus.
Das erste Argument, das Sie an Luigi übergeben, ist der Name der letzten Aufgabe in der Pipeline, die Sie ausführen möchten. Das zweite Argument weist Luigi einfach an, den lokalen Scheduler zu verwenden (dazu später mehr).
Sie können auch den Befehl luigi
verwenden:
$ luigi -m run_luigi.py SquaredNumbers --local-scheduler
Um eine Luigi-Aufgabe zu erstellen, erstellen Sie einfach eine Klasse mit "luigi.Task" als übergeordnetem Element und überschreiben Sie einige Methoden. Insbesondere:
--requires ()
ist eine Liste abhängiger Aufgaben
--output ()
ist das Ziel der Aufgabe (z. B. LocalTarget, S3Target usw.)
--run ()
ist die Ausführungslogik
Ist. Luigi überprüft die Rückgabewerte von "require ()" und "output ()" und erstellt entsprechend einen Abhängigkeitsgraphen.
Fest codierte Dateinamen und Einstellungen sind im Allgemeinen Anti-Patterns. Sobald Sie die Struktur und Dynamik einer Aufgabe verstanden haben, sollten Sie Ihre Einstellungen so parametrisieren, dass Sie dasselbe Skript dynamisch mit unterschiedlichen Argumenten aufrufen können.
Das ist die Klasse luigi.Parameter ()
. Jede Luigi-Aufgabe kann mehrere Parameter haben. Nehmen wir zum Beispiel im vorherigen Beispiel an, dass Sie die Nummer ändern können. Da wir Ganzzahlen als Parameter für die Funktion range ()
verwenden, können wir anstelle der Standardparameterklasse luigi.IntParameter
verwenden. Die geänderte Aufgabe sieht folgendermaßen aus:
class PrintNumbers(luigi.Task):
n = luigi.IntParameter()
def requires(self):
return []
def output(self):
return luigi.LocalTarget("numbers_up_to_{}.txt".format(self.n))
def run(self):
with self.output().open('w') as f:
for i in range(1, self.n+1):
f.write("{}\n".format(i))
class SquaredNumbers(luigi.Task):
n = luigi.IntParameter()
def requires(self):
return [PrintNumbers(n=self.n)]
def output(self):
return luigi.LocalTarget("squares_up_to_{}.txt".format(self.n))
def run(self):
with self.input()[0].open() as fin, self.output().open('w') as fout:
for line in fin:
n = int(line.strip())
out = n * n
fout.write("{}:{}\n".format(n, out))
So erhöhen Sie die Aufgabe "SquaredNumbers" auf 20 und rufen sie auf:
$ python run_luigi.py SquaredNumbers --local-scheduler --n 20
Parameter können auch Standardwerte haben. Zum Beispiel:
n = luigi.IntParameter(default=10)
In diesem Fall wird 10 verwendet, sofern nicht das Argument "--n" angegeben ist.
Früher habe ich die Option "--local-scheduler" verwendet, als ich Luigis Aufgaben auf dem lokalen Scheduler ausführte. Dies ist nützlich für die Entwicklung, aber für Produktumgebungen sollten Sie einen zentralen Scheduler verwenden (siehe Dokumentation unter Scheduler).
Dies hat mehrere Vorteile:
So führen Sie den Luigi-Scheduler-Daemon im Vordergrund aus:
$ luigid
Im Hintergrund:
$ luigid --background
Standardmäßig wird Port 8082 verwendet, sodass Sie die Visualisierung anzeigen können, indem Sie mit Ihrem Browser auf http: // localhost: 8082 / zugreifen.
Wenn der globale Luigi-Scheduler ausgeführt wird, kann er ohne Optionen für den lokalen Scheduler erneut ausgeführt werden:
$ python run_luigi.py SquaredNumbers --n [BIG_NUMBER]
Der Beispielcode endet in Millisekunden. Wenn Sie jedoch zum Browser wechseln und das Abhängigkeitsdiagramm anzeigen möchten, während die Aufgabe noch ausgeführt wird, möchten Sie wahrscheinlich eine große Anzahl, z. B. 10.000.000 oder mehr, für die Option "--n". Du solltest es geben.
Ein Screenshot des Abhängigkeitsgraphen ist:
Wir haben die Definition einer Datenpipeline mit Luigi, einem in Python geschriebenen Workflow-Manager, besprochen. Luigi bietet eine schöne Abstraktion der Pipeline mit Aufgaben und Zielen und berücksichtigt auch Abhängigkeiten für Sie.
Aus der Perspektive der Wiederverwendung von Code und der Einstellung, vom Prototyp zum Produkt zu wechseln, habe ich individuelle Python-Pakete für Geschäftslogikaufgaben. Ich finde es nützlich, es als -and-Distribute-Python-Pakete /) zu definieren (dh mit der Datei setup.py
). Auf diese Weise können Sie einfach "import your_package" aus Ihrem Luigi-Skript deklarieren und von dort aus aufrufen.
Es ist möglich, dass eine Aufgabe mehrere Dateien als Ausgabe generiert. In diesem Fall sollten Sie jedoch wahrscheinlich überlegen, ob die Aufgabe in kleinere Einheiten (dh mehrere Aufgaben) unterteilt werden kann. Sind ihre Ausgaben logisch gleich? Gibt es Abhängigkeiten? Wenn Sie eine Aufgabe nicht aufteilen können, ist es meiner Meinung nach einfach und bequem, output ()
einfach zu einer Protokolldatei zu machen, die den Namen der Aufgabe selbst, den Zeitstempel usw. kombiniert. Der Name der Protokolldatei lautet ungefähr "TaskName_timestamp_param1value_param2value_etc".
Workflow-Manager wie Luigi behandeln Abhängigkeiten, reduzieren die Anzahl der Codevorlagen für die Parameter- und Fehlerbehandlung, verwalten die Fehlerbehebung und folgen klaren Mustern bei der Entwicklung von Datenpipelines. Im Allgemeinen ist es nützlich, weil es tut.
Es ist wichtig, auch die Grenzen zu berücksichtigen:
--Lugi wurde für Batch-Jobs entwickelt und ist daher wahrscheinlich für die Verarbeitung in nahezu Echtzeit nutzlos
Recommended Posts