[PYTHON] Jouez avec le cadre gratuit GCP ② ~ Airflow (sur Compute Engine), Cloud Functions ~

À propos de cet article

Je vais vous présenter ce que vous pouvez faire avec le cadre gratuit de GCP à plusieurs reprises (l'article précédent ici). Le contenu de l'offre gratuite est sujet à changement, et les paiements à l'utilisation peuvent être facturés si la limite est dépassée, donc Informations officielles Veuillez utiliser à vos risques et périls en vérifiant = ja).

L'objectif cette fois-ci est d'exécuter ** Cloud Functions ** à partir de ** Apache Airflow ** (ci-après Airflow) installé sur ** Compute Engine **. Cela peut être une configuration un peu étrange, mais j'utilise en fait cette configuration pour automatiser la collecte et la transmission d'informations sur Twitter (ici).

Au fait, j'ai choisi cette configuration pour les raisons suivantes. Si vous avez de l'argent, utilisez ** Cloud Composer ** docilement.

Une brève introduction à chaque service

Cloud Functions Un service GCP qui vous permet d'exécuter du code sans serveur. Cette fois, nous exécuterons le code Python en fonction de la requête HTTP. Vous pouvez également exécuter à partir de Cloud Pub / Sub ou d'un code autre que Python.

Compute Engine Machine virtuelle GCP. S'il s'agit d'un cadre libre, vous pouvez utiliser un type de machine appelé f1-micro (cependant, la mémoire est de 0,6 Go et les spécifications sont modestes). Vous pouvez également choisir le système d'exploitation d'Ubuntu, Debian, CentOS, etc.

Airflow Développé à l'origine par Airbnb, il s'agit d'un cadre de gestion des flux de travail. Cela ressemble à une version améliorée de cron, mais il présente les avantages suivants.

GCP ne semble pas avoir de niveau gratuit pour un service appelé Cloud Composer, donc cette fois je l'installerai moi-même sur le niveau gratuit Compute Engine.

Paramètres Cloud Functions

Ici, nous allons créer les deux fonctions suivantes.

  1. Obtenez la météo de demain sur API et enregistrez-la dans Cloud Storage
  2. Obtenez la météo de demain depuis Cloud Storage et notifiez avec LINE Notify

Les deux fichiers suivants sont préparés. Normalement, je pense que le fichier est séparé pour chaque fonction, mais cette fois, il est facile de rendre à la fois main.py et requirements.py communs aux deux fonctions.

Tout d'abord, le contenu de main.py est le suivant. Pour plus d'informations sur LINE Notify, consultez l'article ici.

main.py


import requests
import json
import datetime
from google.cloud import storage

# function1...Obtenez la météo de demain à partir de l'API et enregistrez-la dans Cloud Storage
def function1(request):
    url = "http://weather.livedoor.com/forecast/webservice/json/v1"
    payload = {"city": 130010} #Tokyo
    res = requests.get(url, params=payload)
    res_json = json.loads(res.text.replace("\n", "")) # "\n"Provoque une erreur, alors remplacez
    tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
    forecast = [x for x in res_json["forecasts"] if x["date"] == tomorrow.strftime("%Y-%m-%d")][0]
    client = storage.Client()
    bucket = client.get_bucket("xxxxx") #Créez et remplacez votre propre bucket
    blob = bucket.blob("forecast.json")
    blob.upload_from_string(json.dumps(forecast))

# function2...Obtenez la météo de demain depuis Cloud Storage et notifiez avec LINE Notify
def send_message(msg):
    url = "https://notify-api.line.me/api/notify"
    token = "xxxxx" #Remplacez par votre propre jeton
    payload = {"message": msg}
    headers = {"Authorization": "Bearer {}".format(token)}
    requests.post(url, data=payload, headers=headers)

def function2(requests):
    client = storage.Client()
    bucket = client.get_bucket("xxxxx") #Créez et remplacez votre propre bucket
    blob = bucket.blob("forecast.json")
    forecast = json.loads(blob.download_as_string())
    send_message(forecast["telop"])

Ensuite, requirements.txt ressemble à ceci:

requirements.txt


requests==2.22.0
google-cloud-storage==1.26.0

À ce stade, dans le répertoire où se trouvent main.py et requirements.txt, exécutez ce qui suit pour déployer. Le but est d'interdire les requêtes HTTP de l'extérieur avec --ingress-settings internal-only. Vous pouvez le demander à Compute Engine que vous créerez ultérieurement sans aucun problème.

gcloud functions deploy qiita_function1 --entry-point function1 --runtime python37 --trigger-http --ingress-settings internal-only --allow-unauthenticated
gcloud functions deploy qiita_function2 --entry-point function2 --runtime python37 --trigger-http --ingress-settings internal-only --allow-unauthenticated

Déplaçons-le de la console Cloud Functions pour voir s'il fonctionne réellement. Vérifiez également l'URL du déclencheur (dans un format comme https: // us-central1- <PROJECT> .cloudfunctions.net / qiita_function1).

function.PNG

Présentation d'Airflow

Commencez par créer un moteur Compute Engine à partir de la console GCP. Dans l'environnement où j'ai vérifié l'opération, la partie du cadre rouge de l'image ci-dessous est modifiée par défaut.

gce.PNG

Une fois connecté, exécutez le code suivant. L'installation de Python3 et l'installation d'Airflow sont terminées. J'utiliserai les requêtes plus tard, alors installons-les ensemble. À propos, au moment de la vérification du fonctionnement, la version d'apache-airflow était la 1.10.9.

sudo apt update
sudo apt -y install python3.7 python3-pip
pip3 install --upgrade apache-airflow requests

Une fois déconnecté, connectez-vous à nouveau et initialisez la base de données avec le code suivant. Par défaut, ~ / airflow est le répertoire de base d'Airflow.

airflow initdb

Puis éditez ~ / airflow / airflow.cfg pour changer les paramètres. Trouvez la pièce appropriée et procédez comme suit.

~/airflow/airflow.cfg


#Ne vous arrêtez pas lorsque le DAG est reconnu (nécessite une commande de réactivation du flux d'air explicite si elle est laissée à True)
dags_are_paused_at_creation = False

#Ne pas exécuter les dernières minutes lors de l'exécution de DAG avec une date de début dans le passé
catchup_by_default = False

#Ne pas afficher l'exemple de DAG
load_examples = False

Ensuite, créez un fichier DAG qui exécute Cloud Functions sous ~ / airflow / dags. Remplacez l'URL par l'URL Cloud Functions que vous avez créée précédemment.

~/airflow/dags/qiita_sample.py


from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
import requests
import os
from datetime import timedelta

def exec_functions(url):
    payload = {}
    res = requests.post(url, data=payload)
    if res.status_code//100 != 2:
        raise Exception("response status code is not in 200 - 299")

common_args = {
    'owner': os.environ.get("USER", "unknown"),
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'qiita_sample_v0.0',
    default_args=common_args,
    description='sample dag',
    start_date=days_ago(1),
    schedule_interval="00 09 *  *  *", #Courir tous les jours à 9h00 (18h00 heure japonaise)
)

task1 = PythonOperator(
    task_id='qiita_function1',
    python_callable=exec_functions,
    #provide_context=True,
    op_kwargs={
        "url": "xxxxx", #URL des premières Cloud Functions
    },
    dag=dag,
)

task2 = PythonOperator(
    task_id='qiita_function2',
    python_callable=exec_functions,
    #provide_context=True,
    op_kwargs={
        "url": "xxxxx", #URL de la deuxième Cloud Functions
    },
    dag=dag,
)

task1 >> task2 #Spécifier les dépendances des tâches

Après l'avoir créé, exécutez la commande ʻairflow list_dags`, et si ce DAG est affiché, il est correctement reconnu. J'ai ajouté quelques commentaires au code, mais j'ajouterai quelques points.

Enfin, exécutez le code suivant pour démarrer airflow-scheduler en tant que démon et démarrer DAG à l'heure spécifiée. Le processus se poursuit même si vous vous déconnectez de Compute Engine [^ 1].

airflow scheduler -D

finalement

La plupart des pages qui parlent d'Airflow mentionnent également l'interface utilisateur Web, mais je ne l'ai pas fait ici. Parce que lorsque j'ai exécuté la commande ʻairflow webserver` en raison de la limitation de la mémoire de f1-micro, j'étais en colère parce qu'il n'y avait pas assez de mémoire après quelques secondes.

Si vous renoncez à la ré-exécution automatique et à la spécification des dépendances, vous pouvez faire un compromis avec Cloud Scheduler au lieu d'Airflow. Dans ce cas, veuillez noter que l'offre gratuite est limitée à 3 emplois. De plus, dans mon environnement, je ne pouvais pas l'exécuter à partir de Cloud Scheduler, qui spécifie --ingress-settings internal-only lors du déploiement de Cloud Functions. Je pense que nous devons limiter les demandes d'une autre manière.

[^ 1]: Au début, j'ai essayé de démarrer avec systemd, mais cela n'a pas fonctionné et je me suis installé sur ce formulaire.

Recommended Posts

Jouez avec le cadre gratuit GCP ② ~ Airflow (sur Compute Engine), Cloud Functions ~
Jouez avec le cadre gratuit GCP ① ~ Cloud Run, Datastore et API de messagerie LINE ~
Créez un environnement Flask Python3 + sur Compute Engine de GCP
[GCP] Un mémorandum lors de l'exécution d'un programme Python avec Cloud Functions
Émulez les fonctions GCP Cloud localement
Créez un environnement d'exécution Python à l'aide de GPU avec GCP Compute Engine
Libre de codage en dur des fonctions avec SymPy
Utiliser Cloud Datastore depuis Compute Engine
Jouez avec Turtle sur Google Colab
[GCP] Procédure de création d'une application Web avec Cloud Functions (Python + Flask)