[PYTHON] Spielen Sie mit GCP Free Frame ② ~ Luftstrom (auf Compute Engine), Cloud-Funktionen ~

Über diesen Artikel

Ich werde mehrmals vorstellen, was Sie mit dem kostenlosen Frame von GCP tun können (der vorherige Artikel hier). Der Inhalt der kostenlosen Stufe kann sich ändern, und Pay-as-you-go-Gebühren können erhoben werden, wenn das Limit überschritten wird. Offizielle Informationen Bitte verwenden Sie auf eigenes Risiko bei der Überprüfung = ja).

Das Ziel dieses Mal ist es, ** Cloud-Funktionen ** von ** Apache Airflow ** (im Folgenden Airflow) auszuführen, der auf ** Compute Engine ** installiert ist. Es mag eine etwas seltsame Konfiguration sein, aber ich verwende diese Konfiguration tatsächlich, um das Sammeln und Übertragen von Informationen auf Twitter (hier) zu automatisieren.

Übrigens habe ich mich aus folgenden Gründen für diese Konfiguration entschieden. Wenn Sie Geld haben, verwenden Sie gehorsam ** Cloud Composer **.

Eine kurze Einführung zu jedem Service

Cloud Functions Ein GCP-Dienst, mit dem Sie Code serverlos ausführen können. Dieses Mal führen wir den Python-Code gemäß der HTTP-Anforderung aus. Sie können auch von Cloud Pub / Sub oder einem anderen Code als Python ausgeführt werden.

Compute Engine Virtuelle GCP-Maschine. Wenn es sich um einen freien Frame handelt, können Sie einen Maschinentyp namens f1-micro verwenden (der Speicher beträgt jedoch 0,6 GB und die technischen Daten sind bescheiden). Sie können das Betriebssystem auch unter Ubuntu, Debian, CentOS usw. auswählen.

Airflow Ursprünglich von Airbnb entwickelt, ist es ein Framework für die Verwaltung von Workflows. Es sieht aus wie eine verbesserte Version von cron, hat aber die folgenden Vorteile.

GCP scheint keine kostenlose Stufe für einen Dienst namens Cloud Composer zu haben, daher werde ich sie dieses Mal selbst auf der kostenlosen Schicht Compute Engine installieren.

Einstellungen für Cloud-Funktionen

Hier erstellen wir die folgenden zwei Funktionen.

  1. Holen Sie sich das Wetter von morgen von API und speichern Sie es im Cloud-Speicher
  2. Holen Sie sich das Wetter von morgen aus dem Cloud-Speicher und benachrichtigen Sie mit LINE Notify

Die folgenden zwei Dateien werden vorbereitet. Normalerweise denke ich, dass die Datei für jede Funktion getrennt ist, aber dieses Mal ist es einfach, main.py und require.py für beide Funktionen gemeinsam zu machen.

Erstens ist der Inhalt von main.py wie folgt. Weitere Informationen zu LINE Notify finden Sie im Artikel hier.

main.py


import requests
import json
import datetime
from google.cloud import storage

# function1...Holen Sie sich das Wetter von morgen von der API und speichern Sie es im Cloud-Speicher
def function1(request):
    url = "http://weather.livedoor.com/forecast/webservice/json/v1"
    payload = {"city": 130010} #Tokio
    res = requests.get(url, params=payload)
    res_json = json.loads(res.text.replace("\n", "")) # "\n"Verursacht einen Fehler, also ersetzen
    tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
    forecast = [x for x in res_json["forecasts"] if x["date"] == tomorrow.strftime("%Y-%m-%d")][0]
    client = storage.Client()
    bucket = client.get_bucket("xxxxx") #Erstellen und ersetzen Sie Ihren eigenen Eimer
    blob = bucket.blob("forecast.json")
    blob.upload_from_string(json.dumps(forecast))

# function2...Holen Sie sich das Wetter von morgen aus dem Cloud-Speicher und benachrichtigen Sie mit LINE Notify
def send_message(msg):
    url = "https://notify-api.line.me/api/notify"
    token = "xxxxx" #Ersetzen Sie durch Ihren eigenen Token
    payload = {"message": msg}
    headers = {"Authorization": "Bearer {}".format(token)}
    requests.post(url, data=payload, headers=headers)

def function2(requests):
    client = storage.Client()
    bucket = client.get_bucket("xxxxx") #Erstellen und ersetzen Sie Ihren eigenen Eimer
    blob = bucket.blob("forecast.json")
    forecast = json.loads(blob.download_as_string())
    send_message(forecast["telop"])

Als nächstes sieht die Anforderung.txt folgendermaßen aus:

requirements.txt


requests==2.22.0
google-cloud-storage==1.26.0

Führen Sie zu diesem Zeitpunkt in dem Verzeichnis, in dem sich main.py und require.txt befinden, Folgendes aus, um es bereitzustellen. Es geht darum, HTTP-Anfragen von außen mit "--ingress-settings internal-only" zu verbieten. Sie können es von der Compute Engine anfordern, die Sie später problemlos erstellen werden.

gcloud functions deploy qiita_function1 --entry-point function1 --runtime python37 --trigger-http --ingress-settings internal-only --allow-unauthenticated
gcloud functions deploy qiita_function2 --entry-point function2 --runtime python37 --trigger-http --ingress-settings internal-only --allow-unauthenticated

Verschieben wir es von der Cloud-Funktionskonsole, um zu sehen, ob es tatsächlich funktioniert. Überprüfen Sie auch die Trigger-URL (in einem Format wie "https: // us-central1- .cloudfunctions.net / qiita_function1").

function.PNG

Einführung in Airflow

Erstellen Sie zunächst eine Compute Engine über die GCP-Konsole. In der Umgebung, in der ich den Vorgang überprüft habe, wird der rote Rahmenteil des folgenden Bilds von der Standardeinstellung geändert.

gce.PNG

Führen Sie nach dem Anmelden den folgenden Code aus. Die Python3-Installation und die Airflow-Installation sind abgeschlossen. Ich werde Anfragen später verwenden, also installieren wir sie zusammen. Zum Zeitpunkt der Betriebsüberprüfung war die Version des Apache-Luftstroms übrigens 1.10.9.

sudo apt update
sudo apt -y install python3.7 python3-pip
pip3 install --upgrade apache-airflow requests

Melden Sie sich nach dem Abmelden erneut an und initialisieren Sie die Datenbank mit dem folgenden Code. Standardmäßig ist "~ / airflow" das Airflow-Ausgangsverzeichnis.

airflow initdb

Bearbeiten Sie dann ~ / airflow / airflow.cfg, um die Einstellungen zu ändern. Suchen Sie das relevante Teil und gehen Sie wie folgt vor.

~/airflow/airflow.cfg


#Stoppen Sie nicht, wenn die DAG erkannt wird (erfordert einen expliziten Befehl zum Unterbrechen des Luftstroms, wenn True aktiviert bleibt).
dags_are_paused_at_creation = False

#Laufen Sie nicht länger als Minuten, wenn Sie DAG mit einem Startdatum in der Vergangenheit ausführen
catchup_by_default = False

#DAG-Beispiel nicht anzeigen
load_examples = False

Erstellen Sie als Nächstes eine DAG-Datei, in der Cloud-Funktionen unter "~ / airflow / dags" ausgeführt werden. Ersetzen Sie die URL durch die zuvor erstellte URL für Cloud-Funktionen.

~/airflow/dags/qiita_sample.py


from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
import requests
import os
from datetime import timedelta

def exec_functions(url):
    payload = {}
    res = requests.post(url, data=payload)
    if res.status_code//100 != 2:
        raise Exception("response status code is not in 200 - 299")

common_args = {
    'owner': os.environ.get("USER", "unknown"),
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'qiita_sample_v0.0',
    default_args=common_args,
    description='sample dag',
    start_date=days_ago(1),
    schedule_interval="00 09 *  *  *", #Laufen Sie jeden Tag um 9 Uhr (18:00 Uhr japanischer Zeit).
)

task1 = PythonOperator(
    task_id='qiita_function1',
    python_callable=exec_functions,
    #provide_context=True,
    op_kwargs={
        "url": "xxxxx", #URL der ersten Cloud-Funktionen
    },
    dag=dag,
)

task2 = PythonOperator(
    task_id='qiita_function2',
    python_callable=exec_functions,
    #provide_context=True,
    op_kwargs={
        "url": "xxxxx", #URL der zweiten Cloud-Funktionen
    },
    dag=dag,
)

task1 >> task2 #Geben Sie Aufgabenabhängigkeiten an

Führen Sie nach dem Erstellen den Befehl airflow list_dags aus. Wenn diese DAG angezeigt wird, wird sie korrekt erkannt. Ich habe dem Code einige Kommentare hinzugefügt, aber ich werde ein paar Punkte hinzufügen.

Führen Sie abschließend den folgenden Code aus, um den Luftstromplaner als Dämon zu starten und die DAG zum angegebenen Zeitpunkt zu starten. Der Vorgang wird auch dann fortgesetzt, wenn Sie sich von der Compute Engine abmelden [^ 1].

airflow scheduler -D

Schließlich

Die meisten Seiten, die über Airflow sprechen, erwähnen auch die Web-Benutzeroberfläche, aber ich habe es hier nicht getan. Denn als ich den Befehl airflow webserver aufgrund der Speicherbeschränkung von f1-micro ausführte, war ich wütend, dass nach einigen Sekunden nicht genügend Speicher vorhanden war.

Wenn Sie auf die automatische Neuausführung und die Angabe von Abhängigkeiten verzichten, können Sie mit Cloud Scheduler anstelle von Airflow Kompromisse eingehen. In diesem Fall beachten Sie bitte, dass die kostenlose Stufe auf 3 Jobs begrenzt ist. Außerdem konnte ich es in meiner Umgebung nicht über den Cloud Scheduler ausführen, der beim Bereitstellen von Cloud-Funktionen "--ingress-settings only-internal" angibt. Ich denke, wir müssen Anfragen auf andere Weise begrenzen.

[^ 1]: Zuerst habe ich versucht, mit systemd zu beginnen, aber es hat nicht funktioniert und ich habe mich für dieses Formular entschieden.

Recommended Posts

Spielen Sie mit GCP Free Frame ② ~ Luftstrom (auf Compute Engine), Cloud-Funktionen ~
Spielen Sie mit dem kostenlosen GCP-Frame ~ Cloud Run, Datastore & LINE Messaging API ~
Erstellen Sie eine Python3 + -Kolbenumgebung auf der Compute Engine von GCP
[GCP] Ein Memorandum zum Ausführen eines Python-Programms mit Cloud-Funktionen
Emulieren Sie GCP Cloud-Funktionen lokal
Erstellen Sie eine Python-Ausführungsumgebung mithilfe der GPU mit der GCP Compute Engine
Frei von der harten Codierung von Funktionen mit SymPy
Verwenden Sie den Cloud-Datenspeicher von Compute Engine
Spielen Sie mit Turtle auf Google Colab
[GCP] Verfahren zum Erstellen einer Webanwendung mit Cloud-Funktionen (Python + Flask)