[PYTHON] Ce que je suis tombé sur l'utilisation d'Airflow

introduction

Lorsque nous avons construit l'infrastructure de données, nous avons adopté Airflow comme pipeline de données. À ce moment-là, il y avait des points d'achoppement, je vais donc les noter.

Airflow dans notre entreprise

Nous développons et exploitons plusieurs systèmes utilisant l'apprentissage automatique. Au fur et à mesure que le nombre de projets a augmenté et que les opérations ont progressé, il est devenu nécessaire de répondre aux exigences communes suivantes.

Par conséquent, nous avons décidé que l'infrastructure de données était une phase nécessaire et avons décidé de la construire. Lors de la création d'une infrastructure de données, il est nécessaire de traiter les données d'origine pour un entrepôt de données ou un magasin de données.

À ce moment-là, le pipeline de données devait répondre aux exigences suivantes.

Nous avons adopté Airflow comme un outil qui semble répondre à ces exigences.

C'est la partie inférieure du diagramme conceptuel. データ基盤概念図with_Airflow.png

Aiflow l'a implémenté pour répondre aux exigences ci-dessus.

supposition

La version d'Airflow est «1.10.5».

Stumbling 1. date_exécution et date d'exécution

default_args = {
    'owner': 'Airflow',
    'start_date': datetime(2019, 12, 1),
}
dag = DAG('tutorial1', default_args=default_args,
          schedule_interval=timedelta(days=1))

Ce code commence par ʻexecution_date from 2019-12-01T 00: 00: 00 + 00: 00 Il est exécuté comme 12/1, 12/2, 12/3 ..., et lorsque l'exécution passée est terminée, il sera exécuté tous les jours. À ce moment, en supposant qu'aujourd'hui est2019-12-06T01: 00: 00 + 00: 00 (UTC)`, combien de temps sera exécuté date_exécution?

La réponse est que TaskInstances jusqu'au 2019-12-05T 00: 00: 00 + 00: 00 (UTC) sera exécuté. J'ai mal compris que execution_date courrait jusqu'au 12/6 alors que la date d'aujourd'hui est 12/6. Voici un diagramme en image.

execution_date_flow.png

En plus de cela, s'il y a des exigences telles que vouloir gérer l'heure dans le fuseau horaire: ʻAsie / Tokyo` dans la tâche Soyez prudent car cela peut prêter à confusion.

Stumble 2. Attendez l'instance de tâche de la veille

Parce qu'il était nécessaire d'effectuer le traitement d'aujourd'hui en utilisant le résultat de l'exécution de TaskInstance de la veille TaskInstance d'aujourd'hui a dû attendre le succès de TaskInstance de la veille. Par conséquent, j'ai utilisé wait_for_downstream pour attendre le résultat d'une tâche particulière dans la TaskInstance précédente.

t1 = BashOperator(
    task_id='print1',
    bash_command='echo 1',
    wait_for_downstream=True,
    dag=dag)

Cependant, wait_for_downstream n'attend pas le résultat de la TaskInstance entière précédente.

t1 = BashOperator(
    task_id='print1',
    bash_command='echo 1',
    wait_for_downstream=True,
    dag=dag)

t2 = BashOperator(
    task_id='print2',
    bash_command='echo 2',
    wait_for_downstream=True,
    dag=dag)

t1 >> t2

Si vous écrivez, la tâche t1 pour cette heure sera exécutée lorsque la tâche t1 pour l'instant précédent sera terminée (sans attendre l'achèvement de t2). Cependant, la tâche t1 pendant ce temps doit attendre à la fois les tâches t1 et t2 pour l'instant précédent. J'ai donc utilisé ʻExternalTaskSensor` et l'ai configuré pour attendre la dernière tâche pour la dernière fois.

t_check_previous_dag_run = ExternalTaskSensor(
    task_id='is_success_pre_dag_run',
    external_dag_id=dag.dag_id,
    allowed_states=['success', 'skipped'],
    external_task_id='your_last_task_id',
    execution_delta=timedelta(days=1)
)

#t1 est la première tâche que vous souhaitez effectuer
t_check_previous_dag_run >> t1

Cependant, TaskInstance (execution_date = start_date) qui fonctionne en premier avec cette seule description Continuera d'attendre l'achèvement des tâches qui n'existent pas et ne se poursuivront pas.

Par conséquent, en outre

# is_initial est une fonction permettant de déterminer s'il s'agit du premier utilisateur d'exécution_defined_Défini et utilisé dans les macros
t_check_is_initial = BranchPythonOperator(
    task_id='is_initial',
    python_callable=lambda is_initial_str: 'do_nothing' if is_initial_str == 'True' else 'is_success_pre_dag_run',  # NOQA
    op_args=['{{ is_initial(execution_date) }}']
)

t_do_nothing = DummyOperator(
    task_id='do_nothing'
)

#déclencher pour ne pas être sauté_rule='none_failed'L'ensemble
t1 = BashOperator(
    task_id='print1',
    bash_command='echo 1',
    trigger_rule='none_failed',
    dag=dag)

t_check_is_initial >> t_check_previous_dag_run >> t1
t_check_is_initial >> t_do_nothing >> t1

J'ai écrit le code tel que, et dans la première exécution, je l'ai évité en sautant ʻExternalTaskSensor`.

externaltasksensor.png

Il est devenu redondant, mais il est devenu évident d'attendre TaskInstance la veille.

Cependant, il est toujours redondant, veuillez donc apprendre si vous connaissez une autre façon d'attendre le résultat de l'exécution de la veille.

Stumble 3. ShortCircuitOperator, Ignorer les règles d'état

ShortCircuitOperator donne le statut de saut des tâches suivantes à tous lorsque la fonction déclarée avec python_callable retourne false. Par conséquent, il n'est pas possible d'ignorer la tâche immédiatement après, mais vous souhaitez exécuter la tâche plus loin.

スクリーンショット 2019-12-06 22.51.16.png

Dans l'exemple ci-dessus, vous ne pouvez pas utiliser la tâche de court-circuit (ShortCircuitOperator) pour ignorer print2_2 pour exécuter la tâche de fin.

De plus, dans BranchPythonOperator, quelque chose de similaire se produit si la trigger_rule de la tâche suivante est définie sur la valeur par défaut ʻall_success`.

t0 = BranchPythonOperator(
    task_id='first_and_branch',
    python_callable=lambda: 'print1',
    dag=dag)

t1 = BashOperator(
    task_id='print1',
    bash_command='echo 1',
    dag=dag)

t2 = BashOperator(
    task_id='print2',
    bash_command='echo 2',
    dag=dag)

t3 = BashOperator(
    task_id='finish',
    bash_command='echo finish',
    dag=dag
)

t0 >> t1
t0 >> t2
t1 >> t3
t2 >> t3
スクリーンショット 2019-12-06 23.10.15.png

Si le trigger_rule de la tâche de fin est ʻall_sucess`, il sera en état de saut si l'une des tâches parentes est en état de saut.

Si vous souhaitez que la tâche de fin s’exécute si aucune des tâches parentes n’a d’état d’échec Si vous définissez trigger_rule sur'aucun_failed 'comme indiqué ci-dessous, cela fonctionnera comme prévu.

t3 = BashOperator(
    task_id='finish',
    bash_command='echo finish',
    trigger_rule='none_failed',
    dag=dag
)

Si la partie de la tâche first_and_branch est ShortCircuitOperator et que le résultat de python_callable est faux, toutes les tâches suivantes seront en état de saut indépendamment de trigger_rule.

Trébucher 4. Notification en cas d'échec

Utilisez default_args pour envoyer des notifications de relâche si une tâche échoue Je l'ai écrit comme suit.

def send_slack():
    #Processus pour émettre une notification de relâche

default_args = {
    'start_date': datetime(2019, 12, 1),
    'on_failure_callback': send_slack
}

Cependant, dans cette manière d'écrire, lorsque la notification de mou n'est pas envoyée pour une raison quelconque, ce fait n'est pas affiché sur l'écran de gestion du flux d'air. En conséquence, il n'était parfois pas possible de remarquer que la tâche de notification de relâche elle-même était interrompue. Par conséquent, en indiquant clairement que la notification d'écart est envoyée à la fin de la tâche comme indiqué ci-dessous, même si la notification d'écart elle-même a échoué, cela peut être remarqué en regardant l'écran de gestion.

t_finish = DummyOperator(
    task_id='task_finish',
    trigger_rule='none_failed',
)

#Utilisez Operator pour envoyer vos propres notifications Slack
# trigger_Les notifications sont ignorées indépendamment du succès ou de l'échec en attribuant des tâches par règle
t_notification_on_success = CustomSlackOperator(
    task_id='notification_on_success',
    trigger_rule='none_failed'
)

t_notification_on_failed = CustomSlackOperator(
    task_id='notification_on_failed',
    is_success=False,
    trigger_rule='one_failed'
)

t_finish >> t_notification_on_success
t_finish >> t_notification_on_failed
スクリーンショット 2019-12-06 22.36.00.png

Puisqu'il est possible que la notification elle-même échoue en raison de la modification du paramètre du côté mou sans le savoir. Je pense qu'il est plus sûr de spécifier également la tâche de notification.

Résumé

Je suis tombé sur d'autres détails, La plupart des modèles peuvent être résolus en lisant attentivement la documentation (bien que je lis parfois le code source).

Flux d'air --Dag peut être défini de manière flexible avec du code python

C'est l'un des rares outils capables de faire de telles choses. Par conséquent, je pense que c'est un bon candidat pour exécuter régulièrement plusieurs tâches complexes.

Recommended Posts

Ce que je suis tombé sur l'utilisation d'Airflow
Ce sur quoi je suis tombé lors de l'utilisation de CodeIgniter sur un serveur Linux
Je suis tombé sur essayer Pylearn 2
Je suis tombé sur l'utilisation de MoviePy, alors prenez note
Ce que je suis entré lors de l'utilisation de Tensorflow-gpu
Je suis tombé sur l'installation de la phrase sur ubuntu
J'ai essayé d'utiliser paramétré
Ce que j'ai appris sur l'IA / l'apprentissage automatique avec Python (1)
J'ai essayé d'utiliser argparse
J'ai essayé d'utiliser la mimesis
J'ai essayé d'utiliser anytree
J'ai essayé d'utiliser aiomysql
J'ai essayé d'utiliser Summpy
J'ai essayé d'utiliser coturn
J'ai essayé d'utiliser Pipenv
J'ai essayé d'utiliser matplotlib
J'ai essayé d'utiliser "Anvil".
J'ai essayé d'utiliser Hubot
J'ai essayé d'utiliser ESPCN
J'ai essayé d'utiliser openpyxl
J'ai essayé d'utiliser Ipython
J'ai essayé d'utiliser PyCaret
Ce que j'ai appris sur l'IA / l'apprentissage automatique avec Python (3)
J'ai essayé d'utiliser cron
J'ai essayé d'utiliser ngrok
J'ai essayé d'utiliser face_recognition
J'ai essayé d'utiliser Jupyter
J'ai essayé d'utiliser doctest
J'ai essayé d'utiliser du folium
J'ai essayé d'utiliser jinja2
J'ai essayé d'utiliser du folium
Ce à quoi j'étais accro lors de l'utilisation de Python tornado
J'ai essayé d'utiliser la fenêtre de temps
Je suis tombé sur PyUnicodeUCS4_FromStringAndSize lors de l'insertion de TensorFlow avec pip
Ce que j'ai appris sur l'IA / l'apprentissage automatique avec Python (2)
Éléments à prendre en compte lors de la mise en œuvre d'Airflow avec docker-compose
Ce que j'ai appris sur l'IA / l'apprentissage automatique avec Python (4)
Ce qui a changé depuis que j'ai commencé à utiliser Visual Studio Code
Qu'est-ce qui a été demandé lors de l'utilisation de Random Forest dans la pratique
Je suis tombé sur TensorFlow (Quelle est la mémoire du GPU)
[J'ai essayé d'utiliser Pythonista 3] Introduction
J'ai essayé d'utiliser easydict (mémo).
J'ai essayé la reconnaissance faciale avec Face ++
J'ai essayé d'utiliser RandomForest
J'ai essayé d'utiliser BigQuery ML
Où je suis tombé sur SQLite3
J'ai essayé d'utiliser Amazon Glacier
J'ai essayé d'utiliser git inspector
Ce que j'ai appris sur Linux
J'ai essayé d'utiliser magenta / TensorFlow
Ce que j'ai appris en Python
J'ai essayé d'utiliser AWS Chalice
J'ai essayé d'utiliser l'émojinateur Slack
Scribble ce que j'ai utilisé lors de l'utilisation d'ipython dans le formatage des données de position