[PYTHON] Worauf ich bei der Verwendung von Airflow gestoßen bin

Einführung

Beim Aufbau der Dateninfrastruktur haben wir Airflow als Datenpipeline übernommen. Zu dieser Zeit gab es einige Stolperpunkte, deshalb werde ich sie aufschreiben.

Luftstrom bei uns

Wir entwickeln und betreiben mehrere Systeme, die maschinelles Lernen verwenden. Da die Anzahl der Projekte zugenommen hat und der Betrieb fortgeschritten ist, ist es notwendig geworden, die folgenden Anforderungen gemeinsam zu erfüllen.

Aus diesem Grund haben wir entschieden, dass die Dateninfrastruktur eine notwendige Phase ist, und beschlossen, sie aufzubauen. Beim Aufbau einer Dateninfrastruktur müssen die Originaldaten für ein Data Warehouse oder einen Data Mart verarbeitet werden.

Zu diesem Zeitpunkt musste die Datenpipeline die folgenden Anforderungen erfüllen.

--Wenn die Logik identisch ist, sind die Daten, die endgültig erstellt werden, auch in Situationen gleich, in denen das Anhalten und Fortsetzen aufgrund eines Fehlers oder das erneute Schlagen von Anfang an mit den erstellten Daten erfolgt.

Wir haben Airflow als Werkzeug eingeführt, das diese Anforderungen zu erfüllen scheint.

Es ist der untere Teil des konzeptionellen Diagramms. データ基盤概念図with_Airflow.png

Aiflow hat es implementiert, um die oben genannten Anforderungen zu erfüllen.

Annahme

Die Version von Airflow ist "1.10.5".

Stolpern 1. Ausführungsdatum und Ausführungsdatum

default_args = {
    'owner': 'Airflow',
    'start_date': datetime(2019, 12, 1),
}
dag = DAG('tutorial1', default_args=default_args,
          schedule_interval=timedelta(days=1))

Dieser Code beginnt mit "Ausführungsdatum" vom "2019-12-01T 00: 00: 00 + 00: 00" Es wird wie 12/1, 12/2, 12/3 ... ausgeführt, und wenn die vergangene Ausführung abgeschlossen ist, wird es jeden Tag ausgeführt. Unter der Annahme, dass heute "2019-12-06T01: 00: 00 + 00: 00 (UTC)" ist, wie lange wird das Ausführungsdatum ausgeführt?

Die Antwort ist, dass TaskInstances bis 2019-12-05T 00: 00: 00 + 00: 00 (UTC) ausgeführt werden. Ich habe falsch verstanden, dass das Ausführungsdatum bis zum 06.12. Laufen würde, wenn das heutige Datum 06.12. Ist. Unten ist ein Bilddiagramm.

execution_date_flow.png

Darüber hinaus, wenn es Anforderungen gibt, wie z. B. die Zeit in der Zeitzone verarbeiten zu wollen: "Asien / Tokio" innerhalb der Aufgabe Seien Sie vorsichtig, da dies verwirrend sein kann.

Stolpern 2. Warten Sie auf die Task-Instanz des Vortages

Weil es notwendig war, die heutige Verarbeitung mit dem Ausführungsergebnis von TaskInstance vom Vortag durchzuführen Die heutige TaskInstance musste auf den Erfolg von TaskInstance am Vortag warten. Daher habe ich wait_for_downstream verwendet, um auf das Ergebnis einer bestimmten Aufgabe in der vorherigen TaskInstance zu warten.

t1 = BashOperator(
    task_id='print1',
    bash_command='echo 1',
    wait_for_downstream=True,
    dag=dag)

"Wait_for_downstream" wartet jedoch nicht auf das Ergebnis der vorherigen gesamten TaskInstance.

t1 = BashOperator(
    task_id='print1',
    bash_command='echo 1',
    wait_for_downstream=True,
    dag=dag)

t2 = BashOperator(
    task_id='print2',
    bash_command='echo 2',
    wait_for_downstream=True,
    dag=dag)

t1 >> t2

Wenn Sie schreiben, wird die t1-Task für diese Zeit ausgeführt, wenn die t1-Task für die vorherige Zeit abgeschlossen ist (ohne auf den Abschluss von t2 zu warten). Die t1-Task für diese Zeit muss jedoch sowohl auf die t1- als auch auf die t2-Task für die vorherige Zeit warten. Also habe ich "ExternalTaskSensor" verwendet und es so eingestellt, dass es zum letzten Mal auf die letzte Aufgabe wartet.

t_check_previous_dag_run = ExternalTaskSensor(
    task_id='is_success_pre_dag_run',
    external_dag_id=dag.dag_id,
    allowed_states=['success', 'skipped'],
    external_task_id='your_last_task_id',
    execution_delta=timedelta(days=1)
)

#t1 ist die erste Aufgabe, die Sie ausführen möchten
t_check_previous_dag_run >> t1

TaskInstance (Ausführungsdatum = Startdatum) funktioniert jedoch zuerst nur mit dieser Beschreibung Wartet weiterhin auf den Abschluss von Aufgaben, die nicht vorhanden sind, und fährt nicht fort.

Deshalb weiter

# is_initial ist eine Funktion, um festzustellen, ob es sich um den ersten Ausführungsbenutzer handelt_defined_Festlegen und in Makros verwenden
t_check_is_initial = BranchPythonOperator(
    task_id='is_initial',
    python_callable=lambda is_initial_str: 'do_nothing' if is_initial_str == 'True' else 'is_success_pre_dag_run',  # NOQA
    op_args=['{{ is_initial(execution_date) }}']
)

t_do_nothing = DummyOperator(
    task_id='do_nothing'
)

#auslösen, um nicht übersprungen zu werden_rule='none_failed'Der Satz
t1 = BashOperator(
    task_id='print1',
    bash_command='echo 1',
    trigger_rule='none_failed',
    dag=dag)

t_check_is_initial >> t_check_previous_dag_run >> t1
t_check_is_initial >> t_do_nothing >> t1

Ich habe einen Code wie diesen geschrieben und ihn vermieden, indem ich bei der ersten Ausführung "ExternalTaskSensor" übersprungen habe.

externaltasksensor.png

Es wurde überflüssig, aber es wurde klar, am Vortag auf TaskInstance zu warten.

Es ist jedoch immer noch redundant. Bitte unterrichten Sie, wenn Sie eine andere Möglichkeit kennen, auf das Ausführungsergebnis des vorherigen Tages zu warten.

Stolpern 3. ShortCircuitOperator, Statusregeln überspringen

ShortCircuitOperator gibt den Status der nachfolgenden Aufgaben zum Überspringen an all, wenn die mit python_callable deklarierte Funktion false zurückgibt. Daher ist es nicht möglich, die Aufgabe unmittelbar danach zu überspringen, sondern die Aufgabe weiter vorne auszuführen.

スクリーンショット 2019-12-06 22.51.16.png

Im obigen Beispiel können Sie die Kurzschlussaufgabe (ShortCircuitOperator) nicht verwenden, um print2_2 zu überspringen und die Abschlussaufgabe auszuführen.

In "BranchPythonOperator" passiert etwas Ähnliches, wenn die Trigger-Regel der nachfolgenden Task auf den Standardwert "all_success" gesetzt wird.

t0 = BranchPythonOperator(
    task_id='first_and_branch',
    python_callable=lambda: 'print1',
    dag=dag)

t1 = BashOperator(
    task_id='print1',
    bash_command='echo 1',
    dag=dag)

t2 = BashOperator(
    task_id='print2',
    bash_command='echo 2',
    dag=dag)

t3 = BashOperator(
    task_id='finish',
    bash_command='echo finish',
    dag=dag
)

t0 >> t1
t0 >> t2
t1 >> t3
t2 >> t3
スクリーンショット 2019-12-06 23.10.15.png

Wenn die Trigger-Regel der Zielaufgabe "all_sucess" lautet, befindet sie sich im Überspringstatus, wenn sich eine der übergeordneten Aufgaben im Überspringstatus befindet.

Wenn Sie möchten, dass die Zielaufgabe ausgeführt wird, wenn keine der übergeordneten Aufgaben einen Fehlerstatus hat Wenn Sie trigger_rule wie unten gezeigt auf "no_failed" setzen, funktioniert dies wie erwartet.

t3 = BashOperator(
    task_id='finish',
    bash_command='echo finish',
    trigger_rule='none_failed',
    dag=dag
)

Wenn der Task-Teil first_and_branch "ShortCircuitOperator" ist und das Ergebnis von python_callable false ist, befinden sich alle nachfolgenden Tasks unabhängig von trigger_rule im Status "Überspringen".

Stolpern 4. Benachrichtigung bei Ausfall

Verwenden Sie default_args, um Slack-Benachrichtigungen zu senden, wenn eine Aufgabe fehlschlägt Ich habe es wie folgt geschrieben.

def send_slack():
    #Prozess zur Ausgabe einer Slack-Benachrichtigung

default_args = {
    'start_date': datetime(2019, 12, 1),
    'on_failure_callback': send_slack
}

In dieser Schreibweise wird diese Tatsache jedoch nicht auf dem Luftstromverwaltungsbildschirm angezeigt, wenn die Slack-Benachrichtigung aus irgendeinem Grund nicht gesendet wird. Infolgedessen war es manchmal nicht möglich zu bemerken, dass die Slack-Benachrichtigungsaufgabe selbst unterbrochen war. Wenn Sie also klar angeben, dass die Slack-Benachrichtigung am Ende der Aufgabe gesendet wird (siehe unten), können Sie dies auch auf dem Verwaltungsbildschirm feststellen, wenn die Slack-Benachrichtigung selbst fehlgeschlagen ist.

t_finish = DummyOperator(
    task_id='task_finish',
    trigger_rule='none_failed',
)

#Verwenden Sie Operator, um Ihre eigenen Slack-Benachrichtigungen zu senden
# trigger_Benachrichtigungen werden unabhängig von Erfolg oder Misserfolg übersprungen, indem Aufgaben nach Regeln zugewiesen werden
t_notification_on_success = CustomSlackOperator(
    task_id='notification_on_success',
    trigger_rule='none_failed'
)

t_notification_on_failed = CustomSlackOperator(
    task_id='notification_on_failed',
    is_success=False,
    trigger_rule='one_failed'
)

t_finish >> t_notification_on_success
t_finish >> t_notification_on_failed
スクリーンショット 2019-12-06 22.36.00.png

Es ist möglich, dass die Benachrichtigung selbst fehlschlägt, weil die Einstellungen auf der lockeren Seite geändert wurden, bevor Sie es wissen. Ich denke, es ist sicherer, auch die Benachrichtigungsaufgabe anzugeben.

Zusammenfassung

Ich bin über andere Details gestolpert, Die meisten Muster konnten durch sorgfältiges Lesen der Dokumentation gelöst werden (obwohl ich manchmal den Quellcode gelesen habe).

Luftstrom --Dag kann mit Python-Code flexibel definiert werden

Es ist eines der wenigen Werkzeuge, die solche Dinge tun können. Daher denke ich, dass es ein guter Kandidat ist, wenn mehrere komplexe Aufgaben regelmäßig ausgeführt werden.

Recommended Posts

Worauf ich bei der Verwendung von Airflow gestoßen bin
Worauf ich bei der Verwendung von CodeIgniter auf einem Linux-Server gestoßen bin
Ich bin auf Pylearn 2 gestoßen
Ich bin auf MoviePy gestoßen, machen Sie sich also eine Notiz
Worauf ich mich bei der Verwendung von Tensorflow-gpu eingelassen habe
Ich stolperte über die Installation des Satzstücks auf Ubuntu
Ich habe versucht, parametrisiert zu verwenden
Was ich über KI / maschinelles Lernen mit Python gelernt habe (1)
Ich habe versucht, Argparse zu verwenden
Ich habe versucht, Mimesis zu verwenden
Ich habe versucht, anytree zu verwenden
Ich habe versucht, aiomysql zu verwenden
Ich habe versucht, Summpy zu verwenden
Ich habe versucht, Coturn zu verwenden
Ich habe versucht, Pipenv zu verwenden
Ich habe versucht, Matplotlib zu verwenden
Ich habe versucht, "Anvil" zu verwenden.
Ich habe versucht, Hubot zu verwenden
Ich habe versucht, ESPCN zu verwenden
Ich habe versucht, openpyxl zu verwenden
Ich habe versucht, Ipython zu verwenden
Ich habe versucht, PyCaret zu verwenden
Was ich über KI / maschinelles Lernen mit Python gelernt habe (3)
Ich habe versucht, Cron zu verwenden
Ich habe versucht, ngrok zu verwenden
Ich habe versucht, face_recognition zu verwenden
Ich habe versucht, Jupyter zu verwenden
Ich habe versucht, doctest zu verwenden
Ich habe versucht, Folium zu verwenden
Ich habe versucht, jinja2 zu verwenden
Ich habe versucht, Folium zu verwenden
Wovon ich süchtig war, als ich Python Tornado benutzte
Ich habe versucht, das Zeitfenster zu verwenden
Ich bin beim Einfügen von TensorFlow mit pip auf PyUnicodeUCS4_FromStringAndSize gestoßen
Was ich über KI / maschinelles Lernen mit Python gelernt habe (2)
Dinge, die Sie bei der Implementierung von Airflow mit Docker-Compose beachten sollten
Was ich über KI / maschinelles Lernen mit Python gelernt habe (4)
Was hat sich geändert, seit ich Visual Studio Code verwende?
Was wurde gefragt, wenn Random Forest in der Praxis verwendet wurde?
Ich bin auf TensorFlow gestoßen (Was ist außerhalb des GPU-Speichers)?
[Ich habe versucht, Pythonista 3 zu verwenden] Einführung
Ich habe versucht, easydict (Memo) zu verwenden.
Ich habe versucht, das Gesicht mit Face ++ zu erkennen
Ich habe versucht, RandomForest zu verwenden
Ich habe versucht, BigQuery ML zu verwenden
Wo ich auf SQLite3 gestoßen bin
Ich habe versucht, Amazon Glacier zu verwenden
Ich habe versucht, Git Inspector zu verwenden
Was ich über Linux gelernt habe
Ich habe versucht, Magenta / TensorFlow zu verwenden
Was ich in Python gelernt habe
Ich habe versucht, AWS Chalice zu verwenden
Ich habe versucht, Slack Emojinator zu verwenden
Schreiben Sie auf, was ich bei der Formatierung von Pos-Daten mit ipython verwendet habe