How to enable python3 to run when sending jobs from GCP Cloud Composer to Dataflow

What to do in this article

When sending a job from Cloud Composer (apache-airflow = = 11.03) using Dataflow Python Operator, the SDK version on the Dataflow side is now Google Cloud Dataflow SDK for Python 2.5.0, which is scheduled to end support. Since it will end up, I will upgrade the version from the execution environment in Python2 to the execution environment of Python3 of ʻApache Beam Python3.x SDK xxx`.

スクリーンショット 2020-03-04 12.19.23.png

Target audience

--Those who have touched Cloud Composer / Air Flow

Execution environment

Possible causes

When sending a job from Cloud Compposer to Dataflow, I think that the possible cause of execution with Google Cloud Dataflow SDK for Python 2.5.0 is the implementation of Dataflow Python Operator on the airflow side.

Take a look at the implementation

--Initialize the DataFlowHook class in the ʻexecute function of DataflowPythonOperator and execute the start_python_dataflow function.

--In the start_python_dataflow function of DataFlowHook, python2 is hard-coded as part of the command_prefix argument of the _start_dataflow function.

class DataFlowHook(GoogleCloudBaseHook):

    def start_python_dataflow(self, job_name, variables, dataflow, py_options,
                              append_job_name=True):
        name = self._build_dataflow_job_name(job_name, append_job_name)
        variables['job_name'] = name

        def label_formatter(labels_dict):
            return ['--labels={}={}'.format(key, value)
                    for key, value in labels_dict.items()]
        # "python2"Is hard-coded
        self._start_dataflow(variables, name, ["python2"] + py_options + [dataflow],
                             label_formatter)

In the future implementation, we will create a command to send a job to Dataflow, but the prefix of this command is still python2 and we will try to execute the Dataflow file as it is, so the execution environment on the Dataflow side is Google Cloud I'm wondering if it will be the Dataflow SDK for Python 2.5.0.

Solution (as of 03/09/2020)

Do the following in order:

1. Install ʻapache-beam` in the Cloud Composer environment

To install apache-beam, install the following 4 dependencies.

apache-beam==2.15.0
google-api-core==1.14.3
google-apitools==0.5.28
google-cloud-core==1.0.3

To install it, execute the following command. (Put requirements.txt in a suitable directory)

environment=your_composer_environment_name
location=your_location

gcloud composer environments update ${environment} \
--update-pypi-packages-from-file airflow/config/requirements.txt \
--location ${location}

2. Create a class that inherits DataflowPythonOperator`` DataFlowHook`

Create a class that inherits DataflowPythonOperator and DataFlowHook of airflow so that the dataflow file can be executed with the python3 command.

Reference link https://stackoverflow.com/questions/58545759/no-module-named-airfow-gcp-how-to-run-dataflow-job-that-uses-python3-beam-2-15/58631655#58631655

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    'dataflow_default_options': {
        'project': YOUR_PROJECT,
        'temp_location': DATAFLOW_TEMP_LOCATION.format(bucket=BUCKET),
        'runner': 'DataflowRunner'
    }
}


class DataFlow3Hook(DataFlowHook):
    def start_python_dataflow(
        self,
        job_name: str,
        variables: Dict,
        dataflow: str,
        py_options: List[str],
        append_job_name: bool = True,
        py_interpreter: str = "python3"
    ):

        name = self._build_dataflow_job_name(job_name, append_job_name)
        variables['job_name'] = name

        def label_formatter(labels_dict):
            return ['--labels={}={}'.format(key, value)
                    for key, value in labels_dict.items()]

        self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
                             label_formatter)


class DataFlowPython3Operator(DataFlowPythonOperator):

    def execute(self, context):
        """Execute the python dataflow job."""
        bucket_helper = GoogleCloudBucketHelper(
            self.gcp_conn_id, self.delegate_to)
        self.py_file = bucket_helper.google_cloud_to_local(self.py_file)
        hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
                             delegate_to=self.delegate_to,
                             poll_sleep=self.poll_sleep)
        dataflow_options = self.dataflow_default_options.copy()
        dataflow_options.update(self.options)
        # Convert argument names from lowerCamelCase to snake case.
        camel_to_snake = lambda name: re.sub(
            r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
        formatted_options = {camel_to_snake(key): dataflow_options[key]
                             for key in dataflow_options}
        hook.start_python_dataflow(
            self.job_name, formatted_options,
            self.py_file, self.py_options, py_interpreter="python3")


with airflow.DAG(
        dag_id="airflow_test_dataflow",
        default_args=default_args,
        schedule_interval=None) as dag:

    t1 = DummyOperator(task_id="start")
    t2 = DataFlowPython3Operator(
        py_file=DATAFLOW_PY_FILE,
        task_id="test_job",
        dag=dag)

By specifying py_interpreter =" python3 " in the argument of start_python_dataflow executed in the execute function of the DataFlowPython3Operator class, you can execute the Dataflow file with the python3 command. I will.

It is ok if you can confirm that it was executed with the version of ʻApache Beam Python3.6 SDK 2.15.0` as shown below.

スクリーンショット 2020-03-05 11.51.34.png スクリーンショット 2020-03-05 12.26.27.png

Remarks

A modified PR has been created to allow the python3 command to be executed using airflow's DataflowPythonOperator and has been merged into airflow 2.0 and later.

--issue jira

Recommended Posts

How to enable python3 to run when sending jobs from GCP Cloud Composer to Dataflow
Run Cloud Dataflow (Python) from App Engine
How to call Cloud API from GCP Cloud Functions
How to run a Python program from within a shell script
[GCP] How to output Cloud Functions log to Cloud Logging (Stackdriver Logging) (Python)
Let's use Watson from Python! --How to use Developer Cloud Python SDK
How to access wikipedia from python
How to install OpenCV on Cloud9 and run it in Python
[GCP] How to publish Cloud Storage signed URLs (temporary URLs) in Python
How to avoid duplication of data when inputting from Python to SQLite.
How to connect to Cloud Firestore from Google Cloud Functions with python code
How to update the python version of Cloud Shell on GCP
How to update Google Sheets from Python
How to deal with OAuth2 error when using Google APIs from Python
How to access RDS from Lambda (python)
How to switch python versions in cloud9
How to run a Maya Python script
How to start Python (Flask) when EC2 starts
Study from Python Hour7: How to use classes
[Python] How to read data from CIFAR-10 and CIFAR-100
GCP: Repeat from Pub / Sub to Cloud Functions, Cloud Functions to Pub / Sub
How to run MeCab on Ubuntu 18.04 LTS Python
How to generate a Python object from JSON
How to handle Linux commands well from Python
How to run Leap Motion in non-Apple Python
What I did when updating from Python 2.6 to 2.7
How to use Ruby's PyCall to enable pyenv Python
How to pass arguments when invoking python script from blender on the command line
[Python] How to run Jupyter-notebook + pandas + multiprocessing (Pool) [pandas] Memo
[Python] How to remove duplicate values from the list
How to scrape image data from flickr with python
Run a machine learning pipeline with Cloud Dataflow (Python)
How to run python in virtual space (for MacOS)
How to run tests in bulk with Python unittest
Python --Notes when converting from str type to int type
From python to running instance on google cloud platform
How to run setUp only once in python unittest
How to download files from Selenium in Python in Chrome
How to exit when using Python in Terminal (Mac)
Execute Python function from Powershell (how to pass arguments)
[Python] How to call a c function from python (ctypes)
How to create a kubernetes pod from python code
How to install Python
Changes from Python 3.0 to Python 3.5
Changes from Python 2 to Python 3.0
How to install python
Run python from excel
Cloud Run tutorial (python)
How to slice a block multiple array from a multiple array in Python
How to run a Python file at a Windows 10 command prompt
Don't lose to Ruby! How to run Python (Django) on Heroku
How to launch AWS Batch from a python client app
How to connect to various DBs from Python (PEP 249) and SQLAlchemy
How to upload files to Cloud Storage with Firebase's python SDK
How to run the Export function of GCP Datastore automatically
[GCP] A memorandum when running a Python program on Cloud Functions
How to sample from any probability density function in Python
How to run an app built with Python + py2app built with Anaconda
Get Python scripts to run quickly in Cloud Run using responder
How to call Python or Julia from Ruby (experimental implementation)