[PYTHON] Play with GCP free frame ② ~ Airflow (on Compute Engine), Cloud Functions ~

About this article

I will introduce what you can do with the free frame of GCP in several times (the previous article is here). The contents of the free tier are subject to change, and pay-as-you-go ones may be charged if the limit is exceeded, so Official Information Please use at your own risk while checking = ja).

The goal this time is to run ** Cloud Functions ** from ** Apache Airflow ** (hereafter Airflow) installed on ** Compute Engine **. It may be a little strange configuration, but I actually use this configuration to automate the collection and transmission of information on Twitter (here).

By the way, I settled on this configuration for the following reasons. If you have money, use ** Cloud Composer ** obediently.

--I thought that the only way to use Airflow for free was to install it on f1-micro (Compute Engine's free tier). --When you start Airflow, there is not enough memory in f1-micro, so I thought that I had to make full use of Cloud Functions.

A brief introduction to each service

Cloud Functions A GCP service that allows you to run code serverlessly. This time, we will execute the Python code according to the HTTP request. You can also run it from Cloud Pub / Sub or code other than Python.

Compute Engine GCP virtual machine. If it is a free frame, you can use a machine type called f1-micro (however, the memory is 0.6GB and the specifications are modest). You can also choose the OS from Ubuntu, Debian, CentOS, etc.

Airflow Originally developed by Airbnb, it is a framework for managing workflows. It looks like an enhanced version of cron, but it has the following advantages.

--Even if an error occurs, you can re-execute at any number of times and at any time. --Dependencies between tasks can be specified in the form of DAG (Directed Acyclic Graph)

There seems to be no free tier for the service called Cloud Composer in GCP, so this time I will install it myself on the free tier Compute Engine.

Cloud Functions settings

Here, we will create the following two Functions.

  1. Get tomorrow's weather from API and save it to Cloud Storage
  2. Get tomorrow's weather from Cloud Storage and notify with LINE Notify

The following two files are prepared. Normally, I think that the file is separated for each function, but this time it is easy to make both main.py and requirements.py common to the two Functions.

First, the contents of main.py are as follows. For more information on LINE Notify, see the article here.

main.py


import requests
import json
import datetime
from google.cloud import storage

# function1...Get tomorrow's weather from API and save to Cloud Storage
def function1(request):
    url = "http://weather.livedoor.com/forecast/webservice/json/v1"
    payload = {"city": 130010} #Tokyo
    res = requests.get(url, params=payload)
    res_json = json.loads(res.text.replace("\n", "")) # "\n"Causes an error, so replace
    tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
    forecast = [x for x in res_json["forecasts"] if x["date"] == tomorrow.strftime("%Y-%m-%d")][0]
    client = storage.Client()
    bucket = client.get_bucket("xxxxx") #Create and replace your own bucket
    blob = bucket.blob("forecast.json")
    blob.upload_from_string(json.dumps(forecast))

# function2...Get tomorrow's weather from Cloud Storage and notify with LINE Notify
def send_message(msg):
    url = "https://notify-api.line.me/api/notify"
    token = "xxxxx" #Replace with your own token
    payload = {"message": msg}
    headers = {"Authorization": "Bearer {}".format(token)}
    requests.post(url, data=payload, headers=headers)

def function2(requests):
    client = storage.Client()
    bucket = client.get_bucket("xxxxx") #Create and replace your own bucket
    blob = bucket.blob("forecast.json")
    forecast = json.loads(blob.download_as_string())
    send_message(forecast["telop"])

Next, requirements.txt should look like this:

requirements.txt


requests==2.22.0
google-cloud-storage==1.26.0

At this point, in the directory where main.py and requirements.txt are located, run the following to deploy. The point is to prohibit HTTP requests from the outside with --ingress-settings internal-only. You can make a request from Compute Engine, which you will create later, without any problems.

gcloud functions deploy qiita_function1 --entry-point function1 --runtime python37 --trigger-http --ingress-settings internal-only --allow-unauthenticated
gcloud functions deploy qiita_function2 --entry-point function2 --runtime python37 --trigger-http --ingress-settings internal-only --allow-unauthenticated

Let's move it from the Cloud Functions console to see if it actually works. Also check the trigger URL (in a format like https://us-central1-<PROJECT>.cloudfunctions.net/qiita_function1).

function.PNG

Introducing Airflow

First, create a Compute Engine from the GCP console. In the environment I checked the operation, the red frame part of the image below is changed from the default.

--Changed the machine type to f1-micro (for free use) --Changed image to Ubuntu 18.04 LTS

gce.PNG

After logging in, run the following code. The installation of Python 3 and the installation of Airflow are completed. Requests will be used later, so let's install them together. By the way, at the time of operation check, the version of apache-airflow was 1.10.9.

sudo apt update
sudo apt -y install python3.7 python3-pip
pip3 install --upgrade apache-airflow requests

Once you log out, log back in and initialize the database with the following code. By default, ~ / airflow is the Airflow home directory.

airflow initdb

Then edit ~ / airflow / airflow.cfg to change the settings. Find the relevant part and do as follows.

~/airflow/airflow.cfg


#Do not stop when DAG is recognized (requires explicit airflow unpause command if left True)
dags_are_paused_at_creation = False

#Do not run past minutes when running a DAG with a start date in the past
catchup_by_default = False

#Don't show DAG examples
load_examples = False

Next, create a DAG file that runs Cloud Functions under ~ / airflow / dags. Replace the URL with the Cloud Functions URL you just created.

~/airflow/dags/qiita_sample.py


from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
import requests
import os
from datetime import timedelta

def exec_functions(url):
    payload = {}
    res = requests.post(url, data=payload)
    if res.status_code//100 != 2:
        raise Exception("response status code is not in 200 - 299")

common_args = {
    'owner': os.environ.get("USER", "unknown"),
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'qiita_sample_v0.0',
    default_args=common_args,
    description='sample dag',
    start_date=days_ago(1),
    schedule_interval="00 09 *  *  *", #Run at 9am every day (18:00 Japan time)
)

task1 = PythonOperator(
    task_id='qiita_function1',
    python_callable=exec_functions,
    #provide_context=True,
    op_kwargs={
        "url": "xxxxx", #URL of the first Cloud Functions
    },
    dag=dag,
)

task2 = PythonOperator(
    task_id='qiita_function2',
    python_callable=exec_functions,
    #provide_context=True,
    op_kwargs={
        "url": "xxxxx", #URL for the second Cloud Functions
    },
    dag=dag,
)

task1 >> task2 #Specify task dependencies

After creating it, execute the ʻairflow list_dags` command, and if this DAG is displayed, it is recognized correctly. I've added some comments to the code, but I'll add a few points.

--You can specify the execution time in the same format as cron. Please note that the default time is not Japanese time. --The function ʻexec_functions is defined at the beginning and specified in PythonOperator. Note that the way the arguments are passed is a bit special and is specified by ʻop_kwargs. By the way, if you set provide_context = True, you can also pass information such as the DAG and task name to the function. --The last task1 >> task2 specifies the task dependencies. In this case, it is troublesome to send a message even though the weather forecast has not been updated by API, so it is specified like this.

Finally, run the following code to start airflow-scheduler as a daemon and run the DAG at the specified time. Processing continues even if you log out of Compute Engine [^ 1].

airflow scheduler -D

Finally

Most of the pages that talk about Airflow also mention the Web UI, but I didn't do it here. This is because when I run the ʻairflow webserver` command due to the memory limitation of f1-micro, I get angry that there is not enough memory after a few seconds.

If you give up automatic re-execution and dependency specification, you may compromise with Cloud Scheduler instead of Airflow. In that case, please note that the free tier is limited to 3 jobs. Also, in my environment I couldn't run it from Cloud Scheduler, which specifies --ingress-settings internal-only when deploying Cloud Functions. I think we need to limit requests in another way.

[^ 1]: At first I tried to start with systemd, but it didn't work and I settled on this form.

Recommended Posts

Play with GCP free frame ② ~ Airflow (on Compute Engine), Cloud Functions ~
Play with GCP free frame ① ~ Cloud Run, Datastore & LINE Messaging API ~
Build Python3 + flask environment on GCP Compute Engine
[GCP] [Python] Deploy API serverless with Google Cloud Functions!
[GCP] A memorandum when running a Python program on Cloud Functions
Emulate GCP Cloud functions locally
Build a Python execution environment using GPU with GCP Compute engine
Free from hard-coding functions with SymPy
Use Cloud Datastore from Compute Engine
Play with Turtle on Google Colab
[GCP] Procedure for creating a web application with Cloud Functions (Python + Flask)