Beim Aufbau der Dateninfrastruktur haben wir Airflow als Datenpipeline übernommen. Zu dieser Zeit gab es einige Stolperpunkte, deshalb werde ich sie aufschreiben.
Wir entwickeln und betreiben mehrere Systeme, die maschinelles Lernen verwenden. Da die Anzahl der Projekte zugenommen hat und der Betrieb fortgeschritten ist, ist es notwendig geworden, die folgenden Anforderungen gemeinsam zu erfüllen.
Aus diesem Grund haben wir entschieden, dass die Dateninfrastruktur eine notwendige Phase ist, und beschlossen, sie aufzubauen. Beim Aufbau einer Dateninfrastruktur müssen die Originaldaten für ein Data Warehouse oder einen Data Mart verarbeitet werden.
Zu diesem Zeitpunkt musste die Datenpipeline die folgenden Anforderungen erfüllen.
--Wenn die Logik identisch ist, sind die Daten, die endgültig erstellt werden, auch in Situationen gleich, in denen das Anhalten und Fortsetzen aufgrund eines Fehlers oder das erneute Schlagen von Anfang an mit den erstellten Daten erfolgt.
Wir haben Airflow als Werkzeug eingeführt, das diese Anforderungen zu erfüllen scheint.
Es ist der untere Teil des konzeptionellen Diagramms.
Aiflow hat es implementiert, um die oben genannten Anforderungen zu erfüllen.
Die Version von Airflow ist "1.10.5".
default_args = {
'owner': 'Airflow',
'start_date': datetime(2019, 12, 1),
}
dag = DAG('tutorial1', default_args=default_args,
schedule_interval=timedelta(days=1))
Dieser Code beginnt mit "Ausführungsdatum" vom "2019-12-01T 00: 00: 00 + 00: 00" Es wird wie 12/1, 12/2, 12/3 ... ausgeführt, und wenn die vergangene Ausführung abgeschlossen ist, wird es jeden Tag ausgeführt. Unter der Annahme, dass heute "2019-12-06T01: 00: 00 + 00: 00 (UTC)" ist, wie lange wird das Ausführungsdatum ausgeführt?
Die Antwort ist, dass TaskInstances bis 2019-12-05T 00: 00: 00 + 00: 00 (UTC) ausgeführt werden. Ich habe falsch verstanden, dass das Ausführungsdatum bis zum 06.12. Laufen würde, wenn das heutige Datum 06.12. Ist. Unten ist ein Bilddiagramm.
Darüber hinaus, wenn es Anforderungen gibt, wie z. B. die Zeit in der Zeitzone verarbeiten zu wollen: "Asien / Tokio" innerhalb der Aufgabe Seien Sie vorsichtig, da dies verwirrend sein kann.
Weil es notwendig war, die heutige Verarbeitung mit dem Ausführungsergebnis von TaskInstance vom Vortag durchzuführen
Die heutige TaskInstance musste auf den Erfolg von TaskInstance am Vortag warten.
Daher habe ich wait_for_downstream
verwendet, um auf das Ergebnis einer bestimmten Aufgabe in der vorherigen TaskInstance zu warten.
t1 = BashOperator(
task_id='print1',
bash_command='echo 1',
wait_for_downstream=True,
dag=dag)
"Wait_for_downstream" wartet jedoch nicht auf das Ergebnis der vorherigen gesamten TaskInstance.
t1 = BashOperator(
task_id='print1',
bash_command='echo 1',
wait_for_downstream=True,
dag=dag)
t2 = BashOperator(
task_id='print2',
bash_command='echo 2',
wait_for_downstream=True,
dag=dag)
t1 >> t2
Wenn Sie schreiben, wird die t1-Task für diese Zeit ausgeführt, wenn die t1-Task für die vorherige Zeit abgeschlossen ist (ohne auf den Abschluss von t2 zu warten). Die t1-Task für diese Zeit muss jedoch sowohl auf die t1- als auch auf die t2-Task für die vorherige Zeit warten. Also habe ich "ExternalTaskSensor" verwendet und es so eingestellt, dass es zum letzten Mal auf die letzte Aufgabe wartet.
t_check_previous_dag_run = ExternalTaskSensor(
task_id='is_success_pre_dag_run',
external_dag_id=dag.dag_id,
allowed_states=['success', 'skipped'],
external_task_id='your_last_task_id',
execution_delta=timedelta(days=1)
)
#t1 ist die erste Aufgabe, die Sie ausführen möchten
t_check_previous_dag_run >> t1
TaskInstance (Ausführungsdatum = Startdatum) funktioniert jedoch zuerst nur mit dieser Beschreibung Wartet weiterhin auf den Abschluss von Aufgaben, die nicht vorhanden sind, und fährt nicht fort.
Deshalb weiter
# is_initial ist eine Funktion, um festzustellen, ob es sich um den ersten Ausführungsbenutzer handelt_defined_Festlegen und in Makros verwenden
t_check_is_initial = BranchPythonOperator(
task_id='is_initial',
python_callable=lambda is_initial_str: 'do_nothing' if is_initial_str == 'True' else 'is_success_pre_dag_run', # NOQA
op_args=['{{ is_initial(execution_date) }}']
)
t_do_nothing = DummyOperator(
task_id='do_nothing'
)
#auslösen, um nicht übersprungen zu werden_rule='none_failed'Der Satz
t1 = BashOperator(
task_id='print1',
bash_command='echo 1',
trigger_rule='none_failed',
dag=dag)
t_check_is_initial >> t_check_previous_dag_run >> t1
t_check_is_initial >> t_do_nothing >> t1
Ich habe einen Code wie diesen geschrieben und ihn vermieden, indem ich bei der ersten Ausführung "ExternalTaskSensor" übersprungen habe.
Es wurde überflüssig, aber es wurde klar, am Vortag auf TaskInstance zu warten.
Es ist jedoch immer noch redundant. Bitte unterrichten Sie, wenn Sie eine andere Möglichkeit kennen, auf das Ausführungsergebnis des vorherigen Tages zu warten.
ShortCircuitOperator
gibt den Status der nachfolgenden Aufgaben zum Überspringen an all
, wenn die mit python_callable deklarierte Funktion false zurückgibt.
Daher ist es nicht möglich, die Aufgabe unmittelbar danach zu überspringen, sondern die Aufgabe weiter vorne auszuführen.
Im obigen Beispiel können Sie die Kurzschlussaufgabe (ShortCircuitOperator) nicht verwenden, um print2_2 zu überspringen und die Abschlussaufgabe auszuführen.
In "BranchPythonOperator" passiert etwas Ähnliches, wenn die Trigger-Regel der nachfolgenden Task auf den Standardwert "all_success" gesetzt wird.
t0 = BranchPythonOperator(
task_id='first_and_branch',
python_callable=lambda: 'print1',
dag=dag)
t1 = BashOperator(
task_id='print1',
bash_command='echo 1',
dag=dag)
t2 = BashOperator(
task_id='print2',
bash_command='echo 2',
dag=dag)
t3 = BashOperator(
task_id='finish',
bash_command='echo finish',
dag=dag
)
t0 >> t1
t0 >> t2
t1 >> t3
t2 >> t3
Wenn die Trigger-Regel der Zielaufgabe "all_sucess" lautet, befindet sie sich im Überspringstatus, wenn sich eine der übergeordneten Aufgaben im Überspringstatus befindet.
Wenn Sie möchten, dass die Zielaufgabe ausgeführt wird, wenn keine der übergeordneten Aufgaben einen Fehlerstatus hat Wenn Sie trigger_rule wie unten gezeigt auf "no_failed" setzen, funktioniert dies wie erwartet.
t3 = BashOperator(
task_id='finish',
bash_command='echo finish',
trigger_rule='none_failed',
dag=dag
)
Wenn der Task-Teil first_and_branch "ShortCircuitOperator" ist und das Ergebnis von python_callable false ist, befinden sich alle nachfolgenden Tasks unabhängig von trigger_rule im Status "Überspringen".
Verwenden Sie default_args, um Slack-Benachrichtigungen zu senden, wenn eine Aufgabe fehlschlägt Ich habe es wie folgt geschrieben.
def send_slack():
#Prozess zur Ausgabe einer Slack-Benachrichtigung
default_args = {
'start_date': datetime(2019, 12, 1),
'on_failure_callback': send_slack
}
In dieser Schreibweise wird diese Tatsache jedoch nicht auf dem Luftstromverwaltungsbildschirm angezeigt, wenn die Slack-Benachrichtigung aus irgendeinem Grund nicht gesendet wird. Infolgedessen war es manchmal nicht möglich zu bemerken, dass die Slack-Benachrichtigungsaufgabe selbst unterbrochen war. Wenn Sie also klar angeben, dass die Slack-Benachrichtigung am Ende der Aufgabe gesendet wird (siehe unten), können Sie dies auch auf dem Verwaltungsbildschirm feststellen, wenn die Slack-Benachrichtigung selbst fehlgeschlagen ist.
t_finish = DummyOperator(
task_id='task_finish',
trigger_rule='none_failed',
)
#Verwenden Sie Operator, um Ihre eigenen Slack-Benachrichtigungen zu senden
# trigger_Benachrichtigungen werden unabhängig von Erfolg oder Misserfolg übersprungen, indem Aufgaben nach Regeln zugewiesen werden
t_notification_on_success = CustomSlackOperator(
task_id='notification_on_success',
trigger_rule='none_failed'
)
t_notification_on_failed = CustomSlackOperator(
task_id='notification_on_failed',
is_success=False,
trigger_rule='one_failed'
)
t_finish >> t_notification_on_success
t_finish >> t_notification_on_failed
Es ist möglich, dass die Benachrichtigung selbst fehlschlägt, weil die Einstellungen auf der lockeren Seite geändert wurden, bevor Sie es wissen. Ich denke, es ist sicherer, auch die Benachrichtigungsaufgabe anzugeben.
Ich bin über andere Details gestolpert, Die meisten Muster konnten durch sorgfältiges Lesen der Dokumentation gelöst werden (obwohl ich manchmal den Quellcode gelesen habe).
Luftstrom --Dag kann mit Python-Code flexibel definiert werden
Es ist eines der wenigen Werkzeuge, die solche Dinge tun können. Daher denke ich, dass es ein guter Kandidat ist, wenn mehrere komplexe Aufgaben regelmäßig ausgeführt werden.
Recommended Posts