[PYTHON] Airflow-I hat versucht, den Zeitplan zu programmieren und die Datenpipeline zu überwachen

Wenn Airflow eingeführt wird, tritt ein Fehler bei der Stapelverarbeitung von cron auf. Durch das Abfangen der Protokolldatei kann verhindert werden, dass die Protokollausgabe zu süß ist und die Ursache nicht identifiziert werden kann.

Kennen Sie Airflow?

Planungs- und Überwachungstool für Open Source-Datenpipelines von Airbnb. Einfach ausgedrückt, ein Hochleistungs-Cron, der einen Jobbaum erstellen kann. Es ist eine Open-Source-Software, die in der Python2-Serie entwickelt wurde und per Pip installiert werden kann. Bei der von AWS jährlich stattfindenden Großveranstaltung re: Invent 2015 gaben mehrere Unternehmen bekannt, dass sie Airflow einsetzen, und erregten Aufmerksamkeit. Ich war daran interessiert, Yahoo Announcement zu lesen. Dieser Artikel ist ein Memo, in dem geprüft und überprüft wurde, ob Airflow in das Projekt eingeführt werden soll.

■ Ich habe versucht, die Analyseaufgabe des Projekts auf Airflow zu setzen スクリーンショット 2015-11-09 4.06.06.png

Airflow ist ein System, mit dem Sie den Zeitplan und die Überwachung Ihrer Datenpipeline programmieren können.

Es gab nur wenige japanische Materialien, daher wusste ich zunächst nicht, was Airflow tun kann. Ich möchte die Installationsmethode verschieben und zuerst die Informationen zur Verwendung ergänzen. Nachdem ich es im Rückblick etwa eine Woche lang untersucht habe, denke ich, dass die Beschreibung in der ersten Zeile des Airflow-Repositorys Airflow am besten beschreibt.

Airflow is a system to programmatically author, schedule and monitor data pipelines.(Super Übersetzung:Airflow ist ein System, das durch Programmierung die folgenden Funktionen bietet. Beispiel:Datenpipeline-Zeitplan, Überwachung usw.)

Wenn Airflow für andere Zwecke als zum Planen und Überwachen verwendet wird, z. B. zum Schreiben von Datenoperationsbefehlen oder zum manuellen Ausführen von Aufgaben, wird es sofort schwierig, das System zu verwenden. Daher ist es wichtig, die Verwendung vor der Einführung zu trennen. ist.

Lernen Sie aus einem konkreten Beispiel: Der Prozess des Auffüllens von Google BigQuery aus der Datenbank

Ich habe eine Analyseaufgabe in Airflow geschrieben. Der Grund, warum die Daten einmal in S3 abgelegt werden, besteht darin, dass sich die Erfassungszeit verschiebt, wenn der Prozess in der Mitte fehlschlägt, wenn er erneut von der Datenbank abgerufen wird, und ich den Befehl dump nicht mehrmals am Tag drücken möchte.

■ Technische Daten Speichern Sie einmal täglich die erforderlichen Daten aus MySQL, speichern Sie sie in S3, kopieren Sie sie in Google Cloud Storage und füllen Sie BigQuery. Senden Sie eine E-Mail an die betroffenen Personen, wenn die Daten in Google Cloud Storage installiert sind.

■ Teilen Sie die Spezifikationen in Aufgaben auf

  1. Exportieren Sie MySQL-Daten mithilfe der AWS-Datenpipeline nach Amazon S3 (https://docs.aws.amazon.com/ja_jp/datapipeline/latest/DeveloperGuide/dp-copydata-mysql.html).
  2. Kopieren Sie Daten von S3 in Google Cloud Storage
  3. Geben Sie Daten aus Google Cloud Storage in BigQuery ein
  4. Führen Sie eine E-Mail-Sendeaufgabe aus

■ Was Sie bei der Gestaltung von Airflow nicht tun sollten Wie von Jenkins empfohlen, sollten Jobdetaildefinitionen in jeder Shell oder jedem Befehl zusammengefasst werden. Wenn Sie die Gorigori-Geschäftslogik auf der Airflow-Seite schreiben, ist es schwierig, Aktualisierungen zu verwalten und den Unterschied widerzuspiegeln. Die auf der Luftstromseite zu programmierende Verarbeitung sollte sich auf den Luftstrom und den Zeitplan konzentrieren.

■ Beispiel für die Implementierung eines Luftstrom-Tags Wir werden die Spezifikationen mit Airflow programmieren. Schreiben Sie die Logik in export_db.py (Ich dachte, dass die Implementierung, die nicht als Airflow-Aufgabe erkannt würde, wenn nur ein japanischer Kommentar geschrieben würde, wirklich eine Verschwendung wäre.)

export_db.py


# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())

args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(dag_id='export_db', default_args=args)

CMD_BASE_DIR = '~/analyze/{}'

# cmd file name
EXPORT_DB_TO_S3_CMD = 'export_db_to_s3.sh'
COPY_S3_TO_GOOGLE_CLOUD_STORAGE_CMD = 'copy_s3_to_google_storage.py'
IMPORT_BIG_QUERY_CMD = 'import_big_query.py'
SEND_MAIL_CMD = 'send_mail.py'


def do_cmd(cmd):
    os.system(cmd)

# define task
# 1. db to s3
task1 = PythonOperator(
    task_id='1.' + EXPORT_DB_TO_S3_CMD,
    python_callable=do_cmd,
    provide_context=True,
    op_kwargs={'cmd': CMD_BASE_DIR.format(EXPORT_DB_TO_S3_CMD)},
    dag=dag)

# 2. s3 to cloud storage
task2 = PythonOperator(
    task_id='2.' + COPY_S3_TO_GOOGLE_CLOUD_STORAGE_CMD,
    python_callable=do_cmd,
    op_kwargs={'cmd': CMD_BASE_DIR.format(COPY_S3_TO_GOOGLE_CLOUD_STORAGE_CMD)},
    dag=dag)

# 3. import bq
task3 = PythonOperator(
    task_id='3.' + IMPORT_BIG_QUERY_CMD,
    python_callable=do_cmd,
    op_kwargs={'cmd': CMD_BASE_DIR.format(IMPORT_BIG_QUERY_CMD)},
    dag=dag)

# 4. send mail
task4 = PythonOperator(
    task_id='4.' + SEND_MAIL_CMD,
    python_callable=do_cmd,
    op_kwargs={'cmd': CMD_BASE_DIR.format(SEND_MAIL_CMD)},
    dag=dag)

# define task stream
# 1 >> 2 >> 3 and 2 >> 4
task1.set_downstream(task2)
task2.set_downstream(task3)
task2.set_downstream(task4)

# define start task
run_this = task1

■ Testen Sie, ob die Aufgabe funktioniert python ~/airflow/dags/export_db.py

■ Starten Sie Airflow neu, um die Aufgabe widerzuspiegeln Ich denke, der Punkt, an dem Airflow neu gestartet werden muss, um die Aufgabe widerzuspiegeln, ist wirklich eine Verschwendung. Aufgrund dieses Nachteils wird nicht empfohlen, Geschäftslogik in die Aufgabe zu schreiben.

■ Bestätigen Sie mit dem Browser, dass die Aufgabe registriert ist Standardmäßig ist export_db auf der obersten Seite unter [http: // localhost: 8080 / admin /](http: // localhost: 8080 / admin /) registriert.

■ Planen Sie die Ausführung Beginnen Sie mit "Luftstromplaner", um gemäß dem in Python definierten Zeitplan ausgeführt zu werden. In diesem Fall sind Startdatum und -zeit auf 7 Tage festgelegt, sodass sie sich endlos in einer Endlosschleife bewegen.

■ Bild 1. Diagrammansicht der angegebenen Aufgabe スクリーンショット 2015-11-09 4.43.41.png

■ Bild 2. Top-DAG-Liste スクリーンショット 2015-11-09 4.15.18.png

■ Bild 3. Baumansicht der Aufgaben スクリーンショット 2015-11-09 4.17.32.png

Ich fand Airflow bequemer als Cron

Dies ist ein Einführungsvorteil im Vergleich zur Operation mit Cron. Es ist viel Subjektivität.

■ 1. Visualisierung der Ausführungszeit für jede Aufgabe Ich finde es großartig, eine Zusammenfassung der Ausführungszeit zu erstellen und sie als Grafik in einer schönen Webansicht anzuzeigen. Es ist auch schwierig, eine Zusammenfassung zu erhalten, wenn Sie einen Stapel mit cron ausführen und ihn im Protokoll ausspucken.

■ 2. Das Fehlerprotokoll ist ohnehin leicht zu sehen Sie können im Web sehen, welche Art von Fehler in der zu welchem Zeitpunkt und zu welcher Minute ausgeführten Aufgabe aufgetreten ist und welche Art von Standardfehler ausgegeben wurde. Es gibt einen großen Unterschied zu der Implementierung, bei der immer wieder Protokolldateien ausgespuckt werden, die nicht einmal von cron gedreht werden. Ich finde es wunderbar, dass ein Fehler auftreten kann und die Protokolldatei gesucht wird. Daher ist die Protokollausgabe so süß, dass die Ursache nicht identifiziert werden kann.

■ 3. Kann den Jobbaum konfigurieren Es kann klar als Ausführung von B-Aufgaben definiert werden, nachdem eine Aufgabe abgeschlossen wurde.

■ 4. Baumänderungen und Änderungen der Ausführungszeit können in git aufgezeichnet werden Da der Zeitplan und der Baum mit Python programmiert werden, bleibt der Änderungsverlauf erhalten, wenn Sie ihn mit git verwalten.

Jenkins und Airflow haben unterschiedliche Verwendungszwecke

Airflow kann Aufgaben nicht manuell ausführen. Da sich die Richtung, die wir als Produkt anstreben, von der von Jenkins unterscheidet, denke ich, dass Airflow es nicht wagt, sie umzusetzen. </ del> (In Airflow war es möglich, Aufgaben manuell auszuführen, indem CeleryExecutor eingeführt wurde. Siehe Probleme hier, warum CeleryExecutor erforderlich ist Issues / 51)) In Airflow wird sogar das Festlegen des Tag-Namens im Python-Befehl definiert. Sie können den Ausführungsstatus nur über die WebGUI überwachen und das Verhalten der Aufgabe überhaupt nicht ändern. Ich denke, das mache ich auch. Der Luftstrom ist ein scharfes Produkt. Wenn Sie diesen Bereich falsch verstehen und versuchen, ihn als alternatives Produkt zu Jenkins zu verwenden, besteht die Möglichkeit, dass Sie ihn nicht verwenden können.

Zusammenfassung: Anwendungen, für die Airflow geeignet ist

Es handelt sich um eine vollautomatische Aufgabe, bei der der Fehler in der Aufgabe nur dann auftrat, wenn der Fehler im ersten Monat auftrat, ohne dass er sich seiner Existenz bewusst war. Ich dachte, dass es für solche Anwendungen geeignet ist.

Einführungsmethode

In meiner lokalen Umgebung, in der MySQL ausgeführt wird, konnte ich es in 10 Minuten installieren und starten und den Vorgang mit einem Browser überprüfen.

install Official Readme.rst Ich habe es beim Lesen der Datei installiert.

mkvirtualenv airflow
mkdir ~/airflow
cd ~/airflow/
pip install airflow[mysql]
export AIRFLOW_HOME=~/airflow
airflow initdb

run airflow webserver -p 8080

Kommunikationsbestätigung

Greifen Sie mit einem Browser auf [http: // localhost: 8080 /](http: // localhost: 8080 / admin /) zu

Definieren Sie die erste Aufgabe

mkdir ~/airflow/dags
touch ~/airflow/dags/__init__.py
touch ~/airflow/dags/export_db.py
# export_db.Schreiben Sie eine Aufgabendefinition in py

Aufgabentest

python ~/airflow/dags/export_db.py

Aufführen

airflow list_dags
airflow list_tasks export_db
airflow list_tasks export_db --tree

Planen Sie die Ausführung

airflow scheduler

Recommended Posts

Airflow-I hat versucht, den Zeitplan zu programmieren und die Datenpipeline zu überwachen
Mit algebraischen Datentypen und objektorientierter Programmierung
Datenpipeline-Aufbau mit Python und Luigi