[PYTHON] Airflow-I tried programming data pipeline scheduling and monitoring

When Airflow is introduced, an error occurs in the batch processing of cron, and as a result of catching the log file, it becomes possible to prevent such things as the log output is sweet and the cause cannot be identified.

Do you know Airflow?

Airbnb's open source data pipeline scheduling and monitoring tool. Simply put, a high-performance cron that can build a job tree. It is open source software developed in Python2 series and pip installable. At the large-scale event re: Invent 2015 held by AWS in 1 year, it was announced that multiple companies are using Airflow and attracted attention. I was interested in reading Yahoo Announcement. This article is a memo that examined and verified whether Airflow should be introduced into the project.

■ I tried to put the analysis task of the project on Airflow スクリーンショット 2015-11-09 4.06.06.png

Airflow is a system that can program the scheduling and monitoring of data pipelines.

There were few Japanese materials, so at first I didn't know what Airflow could do. I'd like to postpone the installation method and supplement the information on how to use it first. After examining it for about a week, looking back, I think that the description written in the first line of the Airflow repository is the most appropriate representation of Airflow.

Airflow is a system to programmatically author, schedule and monitor data pipelines.(Super translation:Airflow is a system that provides the following functions by programming. Example:Data pipeline schedule, monitoring, etc.)

If Airflow is used for purposes other than schedule and monitoring, for example, if it is used for writing data operation commands or manually executing tasks, it will become a difficult system to use immediately, so it is important to separate the usage before introduction. is.

Learn from a concrete example: The process of populating Google BigQuery from DB

I wrote an analysis task in Airflow. The reason why the data is placed in S3 once is that if the processing fails in the middle, the acquisition time will shift if it is reacquired from the DB, and I do not want to hit the dump command many times a day.

■ Specifications After dumping the necessary data from MySQL once a day and saving it in S3, copy it to Google Cloud Storage and input the data to BigQuery. Send an email to the people concerned when the data is installed in Google Cloud Storage.

■ Break down specifications into tasks

  1. Export MySQL Data to Amazon S3 Using AWS Data Pipeline (https://docs.aws.amazon.com/ja_jp/datapipeline/latest/DeveloperGuide/dp-copydata-mysql.html)
  2. Copy data from S3 to Google Cloud Storage
  3. Data input from Google Cloud Storage to BigQuery
  4. Perform email sending task

■ What you should not do in the design of Airflow As recommended by jenkins, job detail definitions should be aggregated in each shell or command. If you write the gorigori business logic on the Airflow side, it will be difficult to manage updates and reflect the difference. The processing that should be programmed on the Airflow side should be focused on the flow and schedule.

■ Airflow tag implementation example We will program the specifications with Airflow. Write logic in export_db.py (I thought that the implementation that would not be recognized as an Airflow task if even one Japanese comment was written was really crap.)

export_db.py


# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())

args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(dag_id='export_db', default_args=args)

CMD_BASE_DIR = '~/analyze/{}'

# cmd file name
EXPORT_DB_TO_S3_CMD = 'export_db_to_s3.sh'
COPY_S3_TO_GOOGLE_CLOUD_STORAGE_CMD = 'copy_s3_to_google_storage.py'
IMPORT_BIG_QUERY_CMD = 'import_big_query.py'
SEND_MAIL_CMD = 'send_mail.py'


def do_cmd(cmd):
    os.system(cmd)

# define task
# 1. db to s3
task1 = PythonOperator(
    task_id='1.' + EXPORT_DB_TO_S3_CMD,
    python_callable=do_cmd,
    provide_context=True,
    op_kwargs={'cmd': CMD_BASE_DIR.format(EXPORT_DB_TO_S3_CMD)},
    dag=dag)

# 2. s3 to cloud storage
task2 = PythonOperator(
    task_id='2.' + COPY_S3_TO_GOOGLE_CLOUD_STORAGE_CMD,
    python_callable=do_cmd,
    op_kwargs={'cmd': CMD_BASE_DIR.format(COPY_S3_TO_GOOGLE_CLOUD_STORAGE_CMD)},
    dag=dag)

# 3. import bq
task3 = PythonOperator(
    task_id='3.' + IMPORT_BIG_QUERY_CMD,
    python_callable=do_cmd,
    op_kwargs={'cmd': CMD_BASE_DIR.format(IMPORT_BIG_QUERY_CMD)},
    dag=dag)

# 4. send mail
task4 = PythonOperator(
    task_id='4.' + SEND_MAIL_CMD,
    python_callable=do_cmd,
    op_kwargs={'cmd': CMD_BASE_DIR.format(SEND_MAIL_CMD)},
    dag=dag)

# define task stream
# 1 >> 2 >> 3 and 2 >> 4
task1.set_downstream(task2)
task2.set_downstream(task3)
task2.set_downstream(task4)

# define start task
run_this = task1

■ Test if the task works python ~/airflow/dags/export_db.py

■ Restart Airflow to reflect the task I think the point that Airflow needs to be restarted to reflect the task is really shit. Due to this disadvantage, it is not recommended to write business logic in the task.

■ Confirm that the task is registered with the browser By default, export_db is registered in [http: // localhost: 8080 / admin /](http: // localhost: 8080 / admin /) on the top page.

■ Schedule execution To run according to the schedule defined in python, start with ʻairflow scheduler`. In this case, the start date and time is set to 7 days ago, so it will continue to move in an infinite loop endlessly.

■ Image 1. Graph view of the specified task スクリーンショット 2015-11-09 4.43.41.png

■ Image 2. Top DAG list スクリーンショット 2015-11-09 4.15.18.png

■ Image 3. Tree view of tasks スクリーンショット 2015-11-09 4.17.32.png

I found Airflow to be more convenient than cron

This is an introduction merit when compared with the operation with cron. It is a lot of subjectivity.

■ 1. Visualization of execution time for each task I think it's great to take a summary of the execution time and display it as a graph in a beautiful web view. It's also hard to get a summary if you batch execute with cron and spit out in the log.

■ 2. The error log is easy to see anyway You can see from the Web what kind of error occurred in the task executed at what time and minute, and what kind of standard error was output. There is a big difference from the implementation that keeps spitting out log files that are not even rotated by cron. I think it's wonderful that an error can occur and the log file is hunted, and as a result, the log output is so sweet that the cause cannot be identified, and things like stubbornness and pain can be prevented by the mechanism.

■ 3. Can configure job tree It can be clearly defined as B task execution after A task is completed.

■ 4. Tree changes and execution time changes can be recorded in git Since the schedule and tree will be programmed with python, if you manage it with git, the change history will remain.

jenkins and Airflow have different uses

Airflow cannot manually execute tasks. Since the direction we aim for as a product is different from jenkins, I think that Airflow does not dare to implement it. </ del> (In Airflow, it was possible to execute tasks manually by introducing CeleryExecutor. See issues here for why CeleryExecutor is required issues / 51)) In Airflow, even setting the tag name is all defined in the python command. You can only monitor the execution status from the WebGUI, and you cannot change the behavior of the task at all. I think I'm doing that too. Airflow is a sharp product, so if you misunderstand this area and try to operate it as a substitute product for jenkins, there is a possibility that you will not be able to use it.

Summary: Applications that Airflow is suitable for

It's a fully automated task, and it's like where the error occurred in the task only when the failure occurred in the first month without being aware of its existence. I thought that it was suitable for such applications.

Introduction method

In my local environment where mysql is running, I was able to install and start it in 10 minutes and check the operation with a browser.

install Official Readme.rst I installed while reading the file.

mkvirtualenv airflow
mkdir ~/airflow
cd ~/airflow/
pip install airflow[mysql]
export AIRFLOW_HOME=~/airflow
airflow initdb

run airflow webserver -p 8080

Communication confirmation

Access [http: // localhost: 8080 /](http: // localhost: 8080 / admin /) with your browser

Define the first task

mkdir ~/airflow/dags
touch ~/airflow/dags/__init__.py
touch ~/airflow/dags/export_db.py
# export_db.Write task definition in py

Task testing

python ~/airflow/dags/export_db.py

List

airflow list_dags
airflow list_tasks export_db
airflow list_tasks export_db --tree

Schedule execution

airflow scheduler

Recommended Posts

Airflow-I tried programming data pipeline scheduling and monitoring
Algebraic data types and object-oriented programming
Data pipeline construction with Python and Luigi