Wenn Sie einen Auftrag mit "Dataflow Python Operator" von Cloud Composer senden (apache-airflow = = 11.03), lautet die SDK-Version auf der Datenflussseite jetzt "Google Cloud Dataflow SDK für Python 2.5.0", das die Unterstützung beenden soll. Da es am Ende sein wird, werde ich die Version von der Ausführungsumgebung in Python2 auf die Ausführungsumgebung von Python3 von "Apache Beam Python3.x SDK xxx" erhöhen.
Wenn Sie einen Auftrag von Cloud Compposer an Dataflow senden, ist die mögliche Ursache für die Ausführung mit "Google Cloud Dataflow SDK für Python 2.5.0" die Implementierung von "Dataflow Python Operator" auf der Luftstromseite.
Schauen Sie sich die Implementierung an
--Initialisieren Sie die Klasse "DataFlowHook" in der Funktion "execute" von "DataflowPythonOperator" und führen Sie die Funktion "start_python_dataflow" aus.
https://github.com/apache/airflow/blob/ffd65440a0b730dcf524934225a65676045ce1f8/airflow/contrib/operators/dataflow_operator.py#L379
Die Funktion "start_python_dataflow" von "DataFlowHook" wird mit "python2" ausgeführt, das als Teil des Arguments "command_prefix" der Funktion "_start_dataflow" fest codiert ist.
https://github.com/apache/airflow/blob/ffd65440a0b730dcf524934225a65676045ce1f8/airflow/contrib/hooks/gcp_dataflow_hook.py#L239
class DataFlowHook(GoogleCloudBaseHook):
def start_python_dataflow(self, job_name, variables, dataflow, py_options,
append_job_name=True):
name = self._build_dataflow_job_name(job_name, append_job_name)
variables['job_name'] = name
def label_formatter(labels_dict):
return ['--labels={}={}'.format(key, value)
for key, value in labels_dict.items()]
# "python2"Ist fest codiert
self._start_dataflow(variables, name, ["python2"] + py_options + [dataflow],
label_formatter)
In der zukünftigen Implementierung werden wir einen Befehl zum Senden eines Jobs an Dataflow erstellen. Das Präfix dieses Befehls lautet jedoch weiterhin "python2". Wir werden versuchen, die Datenflussdatei so wie sie ist auszuführen. Die Ausführungsumgebung auf der Datenflussseite lautet also "Google Cloud" Ich frage mich, ob es das Dataflow SDK für Python 2.5.0 sein wird.
Führen Sie die folgenden Schritte aus:
Installieren Sie die folgenden vier Abhängigkeiten, um Apache-Beam zu installieren.
apache-beam==2.15.0
google-api-core==1.14.3
google-apitools==0.5.28
google-cloud-core==1.0.3
Führen Sie den folgenden Befehl aus, um es zu installieren.
(Legen Sie require.txt
in ein geeignetes Verzeichnis)
environment=your_composer_environment_name
location=your_location
gcloud composer environments update ${environment} \
--update-pypi-packages-from-file airflow/config/requirements.txt \
--location ${location}
Erstellen Sie eine Klasse, die den Luftstrom "DataflowPythonOperator" und "DataFlowHook" erbt, damit die Datenflussdatei mit dem Befehl python3 ausgeführt werden kann.
Referenzlink https://stackoverflow.com/questions/58545759/no-module-named-airfow-gcp-how-to-run-dataflow-job-that-uses-python3-beam-2-15/58631655#58631655
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 1,
'retry_delay': timedelta(minutes=1),
'dataflow_default_options': {
'project': YOUR_PROJECT,
'temp_location': DATAFLOW_TEMP_LOCATION.format(bucket=BUCKET),
'runner': 'DataflowRunner'
}
}
class DataFlow3Hook(DataFlowHook):
def start_python_dataflow(
self,
job_name: str,
variables: Dict,
dataflow: str,
py_options: List[str],
append_job_name: bool = True,
py_interpreter: str = "python3"
):
name = self._build_dataflow_job_name(job_name, append_job_name)
variables['job_name'] = name
def label_formatter(labels_dict):
return ['--labels={}={}'.format(key, value)
for key, value in labels_dict.items()]
self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
label_formatter)
class DataFlowPython3Operator(DataFlowPythonOperator):
def execute(self, context):
"""Execute the python dataflow job."""
bucket_helper = GoogleCloudBucketHelper(
self.gcp_conn_id, self.delegate_to)
self.py_file = bucket_helper.google_cloud_to_local(self.py_file)
hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
dataflow_options = self.dataflow_default_options.copy()
dataflow_options.update(self.options)
# Convert argument names from lowerCamelCase to snake case.
camel_to_snake = lambda name: re.sub(
r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
formatted_options = {camel_to_snake(key): dataflow_options[key]
for key in dataflow_options}
hook.start_python_dataflow(
self.job_name, formatted_options,
self.py_file, self.py_options, py_interpreter="python3")
with airflow.DAG(
dag_id="airflow_test_dataflow",
default_args=default_args,
schedule_interval=None) as dag:
t1 = DummyOperator(task_id="start")
t2 = DataFlowPython3Operator(
py_file=DATAFLOW_PY_FILE,
task_id="test_job",
dag=dag)
Durch Angabe von "py_interpreter =" python3 "" im Argument "start_python_dataflow", das in der Funktion "execute" der Klasse "DataFlowPython3Operator" ausgeführt wird, können Sie die Datenflussdatei mit dem Befehl "python3" ausführen. Ich werde.
Es ist in Ordnung, wenn Sie bestätigen können, dass es mit der unten gezeigten Version von "Apache Beam Python3.6 SDK 2.15.0" ausgeführt werden kann.
Ein modifizierter PR wurde erstellt, damit der Befehl "python3" mit dem "DataflowPythonOperator" von airflow ausgeführt werden kann, und wurde in airflow 2.0 und höher zusammengeführt.
Ausgabe Jira
https://issues.apache.org/jira/browse/AIRFLOW-4983
PullRrequest
Recommended Posts