Je vais vous présenter ce que vous pouvez faire avec le cadre gratuit de GCP à plusieurs reprises (l'article précédent ici). Le contenu de l'offre gratuite est sujet à changement, et les paiements à l'utilisation peuvent être facturés si la limite est dépassée, donc Informations officielles Veuillez utiliser à vos risques et périls en vérifiant = ja).
L'objectif cette fois-ci est d'exécuter ** Cloud Functions ** à partir de ** Apache Airflow ** (ci-après Airflow) installé sur ** Compute Engine **. Cela peut être une configuration un peu étrange, mais j'utilise en fait cette configuration pour automatiser la collecte et la transmission d'informations sur Twitter (ici).
Au fait, j'ai choisi cette configuration pour les raisons suivantes. Si vous avez de l'argent, utilisez ** Cloud Composer ** docilement.
Cloud Functions Un service GCP qui vous permet d'exécuter du code sans serveur. Cette fois, nous exécuterons le code Python en fonction de la requête HTTP. Vous pouvez également exécuter à partir de Cloud Pub / Sub ou d'un code autre que Python.
Compute Engine Machine virtuelle GCP. S'il s'agit d'un cadre libre, vous pouvez utiliser un type de machine appelé f1-micro (cependant, la mémoire est de 0,6 Go et les spécifications sont modestes). Vous pouvez également choisir le système d'exploitation d'Ubuntu, Debian, CentOS, etc.
Airflow Développé à l'origine par Airbnb, il s'agit d'un cadre de gestion des flux de travail. Cela ressemble à une version améliorée de cron, mais il présente les avantages suivants.
GCP ne semble pas avoir de niveau gratuit pour un service appelé Cloud Composer, donc cette fois je l'installerai moi-même sur le niveau gratuit Compute Engine.
Ici, nous allons créer les deux fonctions suivantes.
Les deux fichiers suivants sont préparés. Normalement, je pense que le fichier est séparé pour chaque fonction, mais cette fois, il est facile de rendre à la fois main.py et requirements.py communs aux deux fonctions.
Tout d'abord, le contenu de main.py est le suivant. Pour plus d'informations sur LINE Notify, consultez l'article ici.
main.py
import requests
import json
import datetime
from google.cloud import storage
# function1...Obtenez la météo de demain à partir de l'API et enregistrez-la dans Cloud Storage
def function1(request):
url = "http://weather.livedoor.com/forecast/webservice/json/v1"
payload = {"city": 130010} #Tokyo
res = requests.get(url, params=payload)
res_json = json.loads(res.text.replace("\n", "")) # "\n"Provoque une erreur, alors remplacez
tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
forecast = [x for x in res_json["forecasts"] if x["date"] == tomorrow.strftime("%Y-%m-%d")][0]
client = storage.Client()
bucket = client.get_bucket("xxxxx") #Créez et remplacez votre propre bucket
blob = bucket.blob("forecast.json")
blob.upload_from_string(json.dumps(forecast))
# function2...Obtenez la météo de demain depuis Cloud Storage et notifiez avec LINE Notify
def send_message(msg):
url = "https://notify-api.line.me/api/notify"
token = "xxxxx" #Remplacez par votre propre jeton
payload = {"message": msg}
headers = {"Authorization": "Bearer {}".format(token)}
requests.post(url, data=payload, headers=headers)
def function2(requests):
client = storage.Client()
bucket = client.get_bucket("xxxxx") #Créez et remplacez votre propre bucket
blob = bucket.blob("forecast.json")
forecast = json.loads(blob.download_as_string())
send_message(forecast["telop"])
Ensuite, requirements.txt ressemble à ceci:
requirements.txt
requests==2.22.0
google-cloud-storage==1.26.0
À ce stade, dans le répertoire où se trouvent main.py et requirements.txt, exécutez ce qui suit pour déployer. Le but est d'interdire les requêtes HTTP de l'extérieur avec --ingress-settings internal-only
. Vous pouvez le demander à Compute Engine que vous créerez ultérieurement sans aucun problème.
gcloud functions deploy qiita_function1 --entry-point function1 --runtime python37 --trigger-http --ingress-settings internal-only --allow-unauthenticated
gcloud functions deploy qiita_function2 --entry-point function2 --runtime python37 --trigger-http --ingress-settings internal-only --allow-unauthenticated
Déplaçons-le de la console Cloud Functions pour voir s'il fonctionne réellement. Vérifiez également l'URL du déclencheur (dans un format comme https: // us-central1- <PROJECT> .cloudfunctions.net / qiita_function1
).
Commencez par créer un moteur Compute Engine à partir de la console GCP. Dans l'environnement où j'ai vérifié l'opération, la partie du cadre rouge de l'image ci-dessous est modifiée par défaut.
Une fois connecté, exécutez le code suivant. L'installation de Python3 et l'installation d'Airflow sont terminées. J'utiliserai les requêtes plus tard, alors installons-les ensemble. À propos, au moment de la vérification du fonctionnement, la version d'apache-airflow était la 1.10.9.
sudo apt update
sudo apt -y install python3.7 python3-pip
pip3 install --upgrade apache-airflow requests
Une fois déconnecté, connectez-vous à nouveau et initialisez la base de données avec le code suivant. Par défaut, ~ / airflow
est le répertoire de base d'Airflow.
airflow initdb
Puis éditez ~ / airflow / airflow.cfg
pour changer les paramètres. Trouvez la pièce appropriée et procédez comme suit.
~/airflow/airflow.cfg
#Ne vous arrêtez pas lorsque le DAG est reconnu (nécessite une commande de réactivation du flux d'air explicite si elle est laissée à True)
dags_are_paused_at_creation = False
#Ne pas exécuter les dernières minutes lors de l'exécution de DAG avec une date de début dans le passé
catchup_by_default = False
#Ne pas afficher l'exemple de DAG
load_examples = False
Ensuite, créez un fichier DAG qui exécute Cloud Functions sous ~ / airflow / dags
. Remplacez l'URL par l'URL Cloud Functions que vous avez créée précédemment.
~/airflow/dags/qiita_sample.py
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
import requests
import os
from datetime import timedelta
def exec_functions(url):
payload = {}
res = requests.post(url, data=payload)
if res.status_code//100 != 2:
raise Exception("response status code is not in 200 - 299")
common_args = {
'owner': os.environ.get("USER", "unknown"),
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'qiita_sample_v0.0',
default_args=common_args,
description='sample dag',
start_date=days_ago(1),
schedule_interval="00 09 * * *", #Courir tous les jours à 9h00 (18h00 heure japonaise)
)
task1 = PythonOperator(
task_id='qiita_function1',
python_callable=exec_functions,
#provide_context=True,
op_kwargs={
"url": "xxxxx", #URL des premières Cloud Functions
},
dag=dag,
)
task2 = PythonOperator(
task_id='qiita_function2',
python_callable=exec_functions,
#provide_context=True,
op_kwargs={
"url": "xxxxx", #URL de la deuxième Cloud Functions
},
dag=dag,
)
task1 >> task2 #Spécifier les dépendances des tâches
Après l'avoir créé, exécutez la commande ʻairflow list_dags`, et si ce DAG est affiché, il est correctement reconnu. J'ai ajouté quelques commentaires au code, mais j'ajouterai quelques points.
est définie au début et spécifiée dans
Python Operator. Notez que la manière dont les arguments sont passés est un peu spéciale et est spécifiée par ʻop_kwargs
. Au fait, si vous définissez provide_context = True
, vous pouvez également transmettre des informations telles que le DAG et le nom de la tâche à la fonction.task1 >> task2
spécifie les dépendances de la tâche. Dans ce cas, il est gênant d'envoyer un message même si les prévisions météo n'ont pas été mises à jour par l'API, c'est donc spécifié comme ceci.Enfin, exécutez le code suivant pour démarrer airflow-scheduler en tant que démon et démarrer DAG à l'heure spécifiée. Le processus se poursuit même si vous vous déconnectez de Compute Engine [^ 1].
airflow scheduler -D
La plupart des pages qui parlent d'Airflow mentionnent également l'interface utilisateur Web, mais je ne l'ai pas fait ici. Parce que lorsque j'ai exécuté la commande ʻairflow webserver` en raison de la limitation de la mémoire de f1-micro, j'étais en colère parce qu'il n'y avait pas assez de mémoire après quelques secondes.
Si vous renoncez à la ré-exécution automatique et à la spécification des dépendances, vous pouvez faire un compromis avec Cloud Scheduler au lieu d'Airflow. Dans ce cas, veuillez noter que l'offre gratuite est limitée à 3 emplois. De plus, dans mon environnement, je ne pouvais pas l'exécuter à partir de Cloud Scheduler, qui spécifie --ingress-settings internal-only
lors du déploiement de Cloud Functions. Je pense que nous devons limiter les demandes d'une autre manière.
[^ 1]: Au début, j'ai essayé de démarrer avec systemd, mais cela n'a pas fonctionné et je me suis installé sur ce formulaire.
Recommended Posts