So aktivieren Sie, dass Python3 Jobs ausführt, wenn Jobs von GCP Cloud Composer an Dataflow gesendet werden

Was ist in diesem Artikel zu tun?

Wenn Sie einen Auftrag mit "Dataflow Python Operator" von Cloud Composer senden (apache-airflow = = 11.03), lautet die SDK-Version auf der Datenflussseite jetzt "Google Cloud Dataflow SDK für Python 2.5.0", das die Unterstützung beenden soll. Da es am Ende sein wird, werde ich die Version von der Ausführungsumgebung in Python2 auf die Ausführungsumgebung von Python3 von "Apache Beam Python3.x SDK xxx" erhöhen.

スクリーンショット 2020-03-04 12.19.23.png

Zielgruppe

Ausführungsumgebung

Mögliche Ursachen

Wenn Sie einen Auftrag von Cloud Compposer an Dataflow senden, ist die mögliche Ursache für die Ausführung mit "Google Cloud Dataflow SDK für Python 2.5.0" die Implementierung von "Dataflow Python Operator" auf der Luftstromseite.

Schauen Sie sich die Implementierung an

--Initialisieren Sie die Klasse "DataFlowHook" in der Funktion "execute" von "DataflowPythonOperator" und führen Sie die Funktion "start_python_dataflow" aus.

class DataFlowHook(GoogleCloudBaseHook):

    def start_python_dataflow(self, job_name, variables, dataflow, py_options,
                              append_job_name=True):
        name = self._build_dataflow_job_name(job_name, append_job_name)
        variables['job_name'] = name

        def label_formatter(labels_dict):
            return ['--labels={}={}'.format(key, value)
                    for key, value in labels_dict.items()]
        # "python2"Ist fest codiert
        self._start_dataflow(variables, name, ["python2"] + py_options + [dataflow],
                             label_formatter)

In der zukünftigen Implementierung werden wir einen Befehl zum Senden eines Jobs an Dataflow erstellen. Das Präfix dieses Befehls lautet jedoch weiterhin "python2". Wir werden versuchen, die Datenflussdatei so wie sie ist auszuführen. Die Ausführungsumgebung auf der Datenflussseite lautet also "Google Cloud" Ich frage mich, ob es das Dataflow SDK für Python 2.5.0 sein wird.

Lösung (Stand 03.09.2020)

Führen Sie die folgenden Schritte aus:

1. Installieren Sie "Apache-Beam" in der Cloud Composer-Umgebung

Installieren Sie die folgenden vier Abhängigkeiten, um Apache-Beam zu installieren.

apache-beam==2.15.0
google-api-core==1.14.3
google-apitools==0.5.28
google-cloud-core==1.0.3

Führen Sie den folgenden Befehl aus, um es zu installieren. (Legen Sie require.txt in ein geeignetes Verzeichnis)

environment=your_composer_environment_name
location=your_location

gcloud composer environments update ${environment} \
--update-pypi-packages-from-file airflow/config/requirements.txt \
--location ${location}

2. Erstellen Sie eine Klasse, die "DataflowPythonOperator" "DataFlowHook" erbt

Erstellen Sie eine Klasse, die den Luftstrom "DataflowPythonOperator" und "DataFlowHook" erbt, damit die Datenflussdatei mit dem Befehl python3 ausgeführt werden kann.

Referenzlink https://stackoverflow.com/questions/58545759/no-module-named-airfow-gcp-how-to-run-dataflow-job-that-uses-python3-beam-2-15/58631655#58631655

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    'dataflow_default_options': {
        'project': YOUR_PROJECT,
        'temp_location': DATAFLOW_TEMP_LOCATION.format(bucket=BUCKET),
        'runner': 'DataflowRunner'
    }
}


class DataFlow3Hook(DataFlowHook):
    def start_python_dataflow(
        self,
        job_name: str,
        variables: Dict,
        dataflow: str,
        py_options: List[str],
        append_job_name: bool = True,
        py_interpreter: str = "python3"
    ):

        name = self._build_dataflow_job_name(job_name, append_job_name)
        variables['job_name'] = name

        def label_formatter(labels_dict):
            return ['--labels={}={}'.format(key, value)
                    for key, value in labels_dict.items()]

        self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
                             label_formatter)


class DataFlowPython3Operator(DataFlowPythonOperator):

    def execute(self, context):
        """Execute the python dataflow job."""
        bucket_helper = GoogleCloudBucketHelper(
            self.gcp_conn_id, self.delegate_to)
        self.py_file = bucket_helper.google_cloud_to_local(self.py_file)
        hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
                             delegate_to=self.delegate_to,
                             poll_sleep=self.poll_sleep)
        dataflow_options = self.dataflow_default_options.copy()
        dataflow_options.update(self.options)
        # Convert argument names from lowerCamelCase to snake case.
        camel_to_snake = lambda name: re.sub(
            r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
        formatted_options = {camel_to_snake(key): dataflow_options[key]
                             for key in dataflow_options}
        hook.start_python_dataflow(
            self.job_name, formatted_options,
            self.py_file, self.py_options, py_interpreter="python3")


with airflow.DAG(
        dag_id="airflow_test_dataflow",
        default_args=default_args,
        schedule_interval=None) as dag:

    t1 = DummyOperator(task_id="start")
    t2 = DataFlowPython3Operator(
        py_file=DATAFLOW_PY_FILE,
        task_id="test_job",
        dag=dag)

Durch Angabe von "py_interpreter =" python3 "" im Argument "start_python_dataflow", das in der Funktion "execute" der Klasse "DataFlowPython3Operator" ausgeführt wird, können Sie die Datenflussdatei mit dem Befehl "python3" ausführen. Ich werde.

Es ist in Ordnung, wenn Sie bestätigen können, dass es mit der unten gezeigten Version von "Apache Beam Python3.6 SDK 2.15.0" ausgeführt werden kann.

スクリーンショット 2020-03-05 11.51.34.png スクリーンショット 2020-03-05 12.26.27.png

Bemerkungen

Ein modifizierter PR wurde erstellt, damit der Befehl "python3" mit dem "DataflowPythonOperator" von airflow ausgeführt werden kann, und wurde in airflow 2.0 und höher zusammengeführt.

Recommended Posts

So aktivieren Sie, dass Python3 Jobs ausführt, wenn Jobs von GCP Cloud Composer an Dataflow gesendet werden
Führen Sie Cloud Dataflow (Python) über AppEngine aus
So rufen Sie die Cloud-API über GCP-Cloud-Funktionen auf
So führen Sie ein Python-Programm in einem Shell-Skript aus
[GCP] Ausgeben von Cloud-Funktionsprotokollen an Cloud Logging (Stackdriver Logging) (Python)
So greifen Sie über Python auf Wikipedia zu
So installieren Sie OpenCV in Cloud9 und führen es in Python aus
[GCP] So veröffentlichen Sie eine mit Cloud Storage signierte URL (temporäre URL) in Python
So vermeiden Sie doppelte Daten bei der Eingabe von Python in SQLite.
Herstellen einer Verbindung zum Cloud Firestore über Google Cloud-Funktionen mit Python-Code
So aktualisieren Sie die Python-Version von Cloud Shell in GCP
So aktualisieren Sie Google Sheets von Python
Umgang mit OAuth2-Fehlern bei Verwendung von Google APIs aus Python
Zugriff auf RDS von Lambda (Python)
So wechseln Sie die Python-Version in Cloud9
So führen Sie Maya Python-Skripte aus
So starten Sie Python (Flask) beim Start von EC2
Studie aus Python Hour7: Verwendung von Klassen
[Python] Lesen von Daten aus CIFAR-10 und CIFAR-100
GCP: Wiederholen Sie den Vorgang von Pub / Sub zu Cloud-Funktionen und von Cloud-Funktionen zu Pub / Sub
So führen Sie MeCab unter Ubuntu 18.04 LTS Python aus
So generieren Sie ein Python-Objekt aus JSON
Wie man gut mit Linux-Befehlen aus Python umgeht
So führen Sie LeapMotion mit Nicht-Apple Python aus
Was ich beim Update von Python 2.6 auf 2.7 gemacht habe
Verwendung von Ruby's PyCall zum Aktivieren von pyenv Python
Übergeben von Argumenten beim Aufrufen von Python-Skripten über Blender in der Befehlszeile
[Python] So führen Sie Jupyter-Notebook + Pandas + Multiprocessing (Pool) [Pandas] Memo aus
[Python] So entfernen Sie doppelte Werte aus der Liste
So kratzen Sie Bilddaten von Flickr mit Python
Führen Sie eine Pipeline für maschinelles Lernen mit Cloud Dataflow (Python) aus.
So führen Sie Python im virtuellen Raum aus (für MacOS)
So führen Sie Tests zusammen mit Python unittest aus
Python - Hinweise beim Konvertieren vom Typ str in den Typ int
Wie man setUp nur einmal in Python unittest ausführt
So laden Sie Dateien von Selenium of Python in Chrome herunter
Beenden bei Verwendung von Python in Terminal (Mac)
Führen Sie die Python-Funktion von Powershell aus (wie Sie Argumente übergeben).
[Python] So rufen Sie eine Funktion von c aus Python auf (ctypes edition)
So installieren Sie Python
Änderungen von Python 3.0 zu Python 3.5
Änderungen von Python 2 zu Python 3.0
So installieren Sie Python
Führen Sie Python aus Excel aus
Cloud Run Tutorial (Python)
So schneiden Sie ein Block-Multiple-Array aus einem Multiple-Array in Python
So führen Sie eine Python-Datei an einer Windows 10-Eingabeaufforderung aus
Verliere nicht gegen Ruby! Wie man Python (Django) auf Heroku ausführt
So starten Sie AWS Batch über die Python-Client-App
Herstellen einer Verbindung zu verschiedenen DBs über Python (PEP 249) und SQL Alchemy
Hochladen von Dateien in den Cloud-Speicher mit dem Python-SDK von Firebase
So führen Sie die Exportfunktion des GCP-Datenspeichers automatisch aus
[GCP] Ein Memorandum zum Ausführen eines Python-Programms mit Cloud-Funktionen
Wie man aus einer Wahrscheinlichkeitsdichtefunktion in Python tastet
So führen Sie eine mit Python + py2app erstellte App aus, die mit Anaconda erstellt wurde
Ermöglichen Sie die schnelle Ausführung von Python-Skripten in Cloud Run mithilfe des Responders
Wie man Python oder Julia von Ruby aus aufruft (experimentelle Implementierung)