Lorsque nous avons construit l'infrastructure de données, nous avons adopté Airflow comme pipeline de données. À ce moment-là, il y avait des points d'achoppement, je vais donc les noter.
Nous développons et exploitons plusieurs systèmes utilisant l'apprentissage automatique. Au fur et à mesure que le nombre de projets a augmenté et que les opérations ont progressé, il est devenu nécessaire de répondre aux exigences communes suivantes.
Par conséquent, nous avons décidé que l'infrastructure de données était une phase nécessaire et avons décidé de la construire. Lors de la création d'une infrastructure de données, il est nécessaire de traiter les données d'origine pour un entrepôt de données ou un magasin de données.
À ce moment-là, le pipeline de données devait répondre aux exigences suivantes.
Nous avons adopté Airflow comme un outil qui semble répondre à ces exigences.
C'est la partie inférieure du diagramme conceptuel.
Aiflow l'a implémenté pour répondre aux exigences ci-dessus.
La version d'Airflow est «1.10.5».
default_args = {
'owner': 'Airflow',
'start_date': datetime(2019, 12, 1),
}
dag = DAG('tutorial1', default_args=default_args,
schedule_interval=timedelta(days=1))
Ce code commence par ʻexecution_date from
2019-12-01T 00: 00: 00 + 00: 00 Il est exécuté comme 12/1, 12/2, 12/3 ..., et lorsque l'exécution passée est terminée, il sera exécuté tous les jours. À ce moment, en supposant qu'aujourd'hui est
2019-12-06T01: 00: 00 + 00: 00 (UTC)`, combien de temps sera exécuté date_exécution?
La réponse est que TaskInstances jusqu'au 2019-12-05T 00: 00: 00 + 00: 00 (UTC)
sera exécuté.
J'ai mal compris que execution_date courrait jusqu'au 12/6 alors que la date d'aujourd'hui est 12/6.
Voici un diagramme en image.
En plus de cela, s'il y a des exigences telles que vouloir gérer l'heure dans le fuseau horaire: ʻAsie / Tokyo` dans la tâche Soyez prudent car cela peut prêter à confusion.
Parce qu'il était nécessaire d'effectuer le traitement d'aujourd'hui en utilisant le résultat de l'exécution de TaskInstance de la veille
TaskInstance d'aujourd'hui a dû attendre le succès de TaskInstance de la veille.
Par conséquent, j'ai utilisé wait_for_downstream
pour attendre le résultat d'une tâche particulière dans la TaskInstance précédente.
t1 = BashOperator(
task_id='print1',
bash_command='echo 1',
wait_for_downstream=True,
dag=dag)
Cependant, wait_for_downstream
n'attend pas le résultat de la TaskInstance entière précédente.
t1 = BashOperator(
task_id='print1',
bash_command='echo 1',
wait_for_downstream=True,
dag=dag)
t2 = BashOperator(
task_id='print2',
bash_command='echo 2',
wait_for_downstream=True,
dag=dag)
t1 >> t2
Si vous écrivez, la tâche t1 pour cette heure sera exécutée lorsque la tâche t1 pour l'instant précédent sera terminée (sans attendre l'achèvement de t2). Cependant, la tâche t1 pendant ce temps doit attendre à la fois les tâches t1 et t2 pour l'instant précédent. J'ai donc utilisé ʻExternalTaskSensor` et l'ai configuré pour attendre la dernière tâche pour la dernière fois.
t_check_previous_dag_run = ExternalTaskSensor(
task_id='is_success_pre_dag_run',
external_dag_id=dag.dag_id,
allowed_states=['success', 'skipped'],
external_task_id='your_last_task_id',
execution_delta=timedelta(days=1)
)
#t1 est la première tâche que vous souhaitez effectuer
t_check_previous_dag_run >> t1
Cependant, TaskInstance (execution_date = start_date) qui fonctionne en premier avec cette seule description Continuera d'attendre l'achèvement des tâches qui n'existent pas et ne se poursuivront pas.
Par conséquent, en outre
# is_initial est une fonction permettant de déterminer s'il s'agit du premier utilisateur d'exécution_defined_Défini et utilisé dans les macros
t_check_is_initial = BranchPythonOperator(
task_id='is_initial',
python_callable=lambda is_initial_str: 'do_nothing' if is_initial_str == 'True' else 'is_success_pre_dag_run', # NOQA
op_args=['{{ is_initial(execution_date) }}']
)
t_do_nothing = DummyOperator(
task_id='do_nothing'
)
#déclencher pour ne pas être sauté_rule='none_failed'L'ensemble
t1 = BashOperator(
task_id='print1',
bash_command='echo 1',
trigger_rule='none_failed',
dag=dag)
t_check_is_initial >> t_check_previous_dag_run >> t1
t_check_is_initial >> t_do_nothing >> t1
J'ai écrit le code tel que, et dans la première exécution, je l'ai évité en sautant ʻExternalTaskSensor`.
Il est devenu redondant, mais il est devenu évident d'attendre TaskInstance la veille.
Cependant, il est toujours redondant, veuillez donc apprendre si vous connaissez une autre façon d'attendre le résultat de l'exécution de la veille.
ShortCircuitOperator
donne le statut de saut des tâches suivantes à tous
lorsque la fonction déclarée avec python_callable retourne false.
Par conséquent, il n'est pas possible d'ignorer la tâche immédiatement après, mais vous souhaitez exécuter la tâche plus loin.
Dans l'exemple ci-dessus, vous ne pouvez pas utiliser la tâche de court-circuit (ShortCircuitOperator) pour ignorer print2_2 pour exécuter la tâche de fin.
De plus, dans BranchPythonOperator
, quelque chose de similaire se produit si la trigger_rule de la tâche suivante est définie sur la valeur par défaut ʻall_success`.
t0 = BranchPythonOperator(
task_id='first_and_branch',
python_callable=lambda: 'print1',
dag=dag)
t1 = BashOperator(
task_id='print1',
bash_command='echo 1',
dag=dag)
t2 = BashOperator(
task_id='print2',
bash_command='echo 2',
dag=dag)
t3 = BashOperator(
task_id='finish',
bash_command='echo finish',
dag=dag
)
t0 >> t1
t0 >> t2
t1 >> t3
t2 >> t3
Si le trigger_rule de la tâche de fin est ʻall_sucess`, il sera en état de saut si l'une des tâches parentes est en état de saut.
Si vous souhaitez que la tâche de fin s’exécute si aucune des tâches parentes n’a d’état d’échec Si vous définissez trigger_rule sur'aucun_failed 'comme indiqué ci-dessous, cela fonctionnera comme prévu.
t3 = BashOperator(
task_id='finish',
bash_command='echo finish',
trigger_rule='none_failed',
dag=dag
)
Si la partie de la tâche first_and_branch est ShortCircuitOperator
et que le résultat de python_callable est faux, toutes les tâches suivantes seront en état de saut indépendamment de trigger_rule.
Utilisez default_args pour envoyer des notifications de relâche si une tâche échoue Je l'ai écrit comme suit.
def send_slack():
#Processus pour émettre une notification de relâche
default_args = {
'start_date': datetime(2019, 12, 1),
'on_failure_callback': send_slack
}
Cependant, dans cette manière d'écrire, lorsque la notification de mou n'est pas envoyée pour une raison quelconque, ce fait n'est pas affiché sur l'écran de gestion du flux d'air. En conséquence, il n'était parfois pas possible de remarquer que la tâche de notification de relâche elle-même était interrompue. Par conséquent, en indiquant clairement que la notification d'écart est envoyée à la fin de la tâche comme indiqué ci-dessous, même si la notification d'écart elle-même a échoué, cela peut être remarqué en regardant l'écran de gestion.
t_finish = DummyOperator(
task_id='task_finish',
trigger_rule='none_failed',
)
#Utilisez Operator pour envoyer vos propres notifications Slack
# trigger_Les notifications sont ignorées indépendamment du succès ou de l'échec en attribuant des tâches par règle
t_notification_on_success = CustomSlackOperator(
task_id='notification_on_success',
trigger_rule='none_failed'
)
t_notification_on_failed = CustomSlackOperator(
task_id='notification_on_failed',
is_success=False,
trigger_rule='one_failed'
)
t_finish >> t_notification_on_success
t_finish >> t_notification_on_failed
Puisqu'il est possible que la notification elle-même échoue en raison de la modification du paramètre du côté mou sans le savoir. Je pense qu'il est plus sûr de spécifier également la tâche de notification.
Je suis tombé sur d'autres détails, La plupart des modèles peuvent être résolus en lisant attentivement la documentation (bien que je lis parfois le code source).
Flux d'air --Dag peut être défini de manière flexible avec du code python
C'est l'un des rares outils capables de faire de telles choses. Par conséquent, je pense que c'est un bon candidat pour exécuter régulièrement plusieurs tâches complexes.
Recommended Posts