[GCP] Steps to deploy DataFlow on Cloud Shell (using Python)

Introduction

I couldn't find a way to deploy DataFlow, and I couldn't find a program that could simply run, so I summarized it as a memorandum.

procedure

1. Install apache_beam

Execute the following command in Cloud Shell.

sudo pip3 install apache_beam[gcp]

The following installation method is useless because an error will occur in beam.io.ReadFromText.

sudo pip install apache_beam

The installation method of apache_beam using the virtual environment is as follows.

#Create folder
mkdir python2
cd python2

#Create virtual environment
python -m virtualenv env

#Activate
source env/bin/activate

# apache-beam installation
pip install apache-beam[gcp]

2. Program creation

This time I created something as simple as below. Just read the file read.txt directly under the specified bucket and output it to the file write.txt.

If you want to actually try it, enter the appropriate contents in PROJECTID, JOB_NAME, BUCKET_NAME.

gcs_readwrite.py


# coding:utf-8
import apache_beam as beam

#Specify job name, project ID, bucket name
PROJECTID = '<PROJECTID>'
JOB_NAME = '<JOB_NAME>'  #Enter the DataFlow job name
BUCKET_NAME = '<BUCKET_NAME>'

#Set job name, project ID, temporary file storage
options = beam.options.pipeline_options.PipelineOptions()
gcloud_options = options.view_as(
    beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name = JOB_NAME
gcloud_options.project = PROJECTID
gcloud_options.staging_location = 'gs://{}/staging'.format(BUCKET_NAME)
gcloud_options.temp_location = 'gs://{}/tmp'.format(BUCKET_NAME)

#Specify the maximum number of workers, machine type, etc.
worker_options = options.view_as(beam.options.pipeline_options.WorkerOptions)
# worker_options.disk_size_gb = 100
# worker_options.max_num_workers = 2
# worker_options.num_workers = 2
# worker_options.machine_type = 'n1-standard-8'
# worker_options.zone = 'asia-northeast1-a'

#Switching the execution environment
# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'  #Run on local machine
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'  #Run on Dataflow

#pipeline
p = beam.Pipeline(options=options)

(p | 'read' >> beam.io.ReadFromText('gs://{}/read.txt'.format(BUCKET_NAME))
    | 'write' >> beam.io.WriteToText('gs://{}/write.txt'.format(BUCKET_NAME))
 )
p.run().wait_until_finish()

3. GCS preparation

  1. Create the bucket name specified by BUCKET_NAME in the above program
  2. Create folders called staging and tmp directly under the created bucket.
  3. Create a file called read.txt locally. Any content is fine
  4. Upload read.txt directly under the created bucket

4. Run locally

First, switch the comment out as follows in "Switching the execution environment" of the above program.

options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'  #Run on local machine
# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'  #Run on Dataflow

Then execute the following command.

python gcs_readwrite.py

This will create a file called write.txt-00000-of-00001 in your bucket.

5. Deploy

First, switch the comment out as follows in "Switching the execution environment" of the above program.

# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'  #Run on local machine
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'  #Run on Dataflow

Then execute the following command.

python gcs_readwrite.py

This will create a file called write.txt-00000-of-00001 in your bucket. If you select the job you created in the DataFlow GUI, you will see that read and write are "completed".

image.png

Bonus (how to create a custom template)

Simply add a line like the one below and run it to create your custom template. You can freely select the save destination and template name.

gcloud_options.template_location = 'gs://{}/template/template_name'.format(BUCKET_NAME)

Use of custom templates Create Job from Custom Template-> Select Template-> Custom Template-> Specify GCS Path for Template Just do.

reference

Quick start using Python https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python?hl=ja

Specify the execution parameters of the pipeline https://cloud.google.com/dataflow/docs/guides/specifying-exec-params

Cloud Dataflow Super Primer https://qiita.com/hayatoy/items/987658490a69c7d24635

Recommended Posts

[GCP] Steps to deploy DataFlow on Cloud Shell (using Python)
How to update the python version of Cloud Shell on GCP
Steps to install python3 on mac
How to enable python3 to run when sending jobs from GCP Cloud Composer to Dataflow
Output to "7-segment LED" using python on Raspberry Pi 3!
Beginners use Python for web scraping (4) --2 Scraping on Cloud Shell
[GCP] Steps to deploy DataFlow on Cloud Shell (using Python)
Create a deploy script with fabric and cuisine and reuse it
Post to Twitter using Python
Start to Selenium using python
Introduction to discord.py (3) Using voice
Deploy django project to heroku
Start to Selenium using python
Update python on Mac to 3.7-> 3.8
[GCP] A memorandum when running a Python program on Cloud Functions
Get Python scripts to run quickly in Cloud Run using responder
12 Steps to Understanding Python Decorators
[GCP] How to output Cloud Functions log to Cloud Logging (Stackdriver Logging) (Python)
After calling the Shell file on Python, convert CSV to Parquet.
How to build a Python environment using Virtualenv on Ubuntu 18.04 LTS
I tried to visualize BigQuery data using Jupyter Lab on GCP
Try to log in to Netflix automatically using python on your PC
How to deploy a web application on Alibaba Cloud as a freelancer
Run XGBoost with Cloud Dataflow (Python)
Using Cloud Storage from Python3 (Introduction)
Notes on installing Python using PyEnv
Notes on using rstrip with python.
Install Python on CentOS using Pyenv
Install Python on CentOS using pyenv
Update Python on Mac from 2 to 3
[AWS] [GCP] I tried to make cloud services easy to use with Python
Connect your SQL Server database to Alibaba Cloud Function Compute using Python
Try to poke DB on IBM i with python + JDBC using JayDeBeApi
Log in to Slack using requests in Python
How to erase Python 2.x on Mac.
Connecting from python to MySQL on CentOS 6.4
Memorandum on how to use gremlin python
[GCP] Operate Google Cloud Storage with Python
Try to operate Excel using Python (Xlwings)
Run servomotor on Raspberry Pi 3 using python
Dump BigQuery tables to GCS using Python
How to switch python versions in cloud9
Install python on xserver to use pip
Introduction to Discrete Event Simulation Using Python # 2
Detect temperature using python on Raspberry Pi 3!
Build a Python + OpenCV environment on Cloud9
[IBM Cloud] I tried to access the Db2 on Cloud table from Cloud Funtions (python)
Convert the cURL API to a Python script (using IBM Cloud object storage)