[PYTHON] Use Cloud Dataflow to dynamically change the destination according to the value of the data and save it in GCS

Introduction

This article is the 23rd day article of Classi Advent Calendar 2019.

Hello, this is @tomoyanamekawa of data AI part of Classi. I usually build a data analysis platform on GCP.

Recently, there was a case that I wanted to divide the data in BigQuery into files according to the values inside and save it in GCS, and at that time I was taken care of by Cloud Dataflow. There seems to be demand for other people, and there were few implementation examples in Python, so I will summarize it.

This goal

Execute daily the process of exporting a specific table in BigQuery to Google Cloud Storage (GCS). However, I want to change the save destination directory depending on the value of a certain column. The file format is json.

Example

Reservations table in BigQuery reservationsテーブル I want to save to GCS separately for each date / shop_id like this. reservations_GCS

Completion drawing

完成図

environment

What is Cloud Dataflow?

It is a service that can perform ETL processing without a server provided by GCP. Behind the scenes, Apache Beam is running, so it can be said that it is a service that can use Apache Beam serverlessly. Since parallel processing can be performed, even large-scale data can be processed at high speed.

It supports both stream processing and batch processing, but this time we will use batch processing. For more information, please visit the Official Page.

For those who want to be able to use it for the time being, I think that this procedure in the presentation material of Mr. Yuzu Taso is good (I also caught it with this) It has been uploaded).

Create custom template

Cloud Dataflow uses what is called a "template" to create an ETL process. For general processing, use Templates provided by Google to make it easy on a GUI basis. I can do it. However, I can't do what I want to do this time, so I will create a custom template myself.

By the way, Java or Python can be used as the programming language. This time I will write in Python, but Java has more functions and documentation, so if you or your team members can write Java and there are no maintenance problems, I think Java is better.

Here is the contents of the custom template.

test_template.py


import os
import json
import datetime

import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions

        
class JsonSink(fileio.TextSink):
    def write(self, record):
      self._fh.write(json.dumps(record).encode('utf8'))
      self._fh.write('\n'.encode('utf8'))


if __name__ == '__main__':
    now = datetime.datetime.now().strftime('%Y%m%d')
    project_id = 'your_project'
    dataset_name = 'your_dataset'
    table_name = 'your_table'
    bucket_name = 'your_bucket'

    #option
    pipeline_options = PipelineOptions()
    google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    google_cloud_options.project = project_id
    google_cloud_options.job_name = 'myjob'
    google_cloud_options.staging_location = f'gs://{bucket_name}/staging'
    google_cloud_options.temp_location = f'gs://{bucket_name}/temp'
    google_cloud_options.template_location = f'gs://{bucket_name}/templates/test_template'
    pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
        
    #Creating a pipeline
    pipeline = beam.Pipeline(options=pipeline_options)
    (pipeline 
        | 'read' >> beam.io.Read(beam.io.BigQuerySource(
            project=project_id, 
            use_standard_sql=True, 
            query=f'select * from `{project_id}.{dataset_name}.{table_name}`'
        ))
        | 'write' >> beam.io.fileio.WriteToFiles(
            path=f'gs://{bucket_name}/{now}',
            destination=lambda record, table_name=table_name: f"shop_id_{record['shop_id']}/",
            sink=JsonSink(),
            file_naming=beam.io.fileio.destination_prefix_naming()
        )
    )

    pipeline.run()

The point is that we are using this Dynamic Destinations function. Since the value for each record is stored in the variable called record, you can change the destination (file name of the save destination) for each record with record ['shop_id'].

Since the created template needs to be placed on GCS, execute this command.

python -m test_template

Then the template will be placed in the location specified by google_cloud_options.template_location. You can also set the location of the template at runtime.

Make it work daily

Cloud Dataflow itself does not have a scheduler function, so it must be run externally in order to run daily. Therefore, this time, we will enable serverless execution with Cloud Scheduler + Cloud Pub / Sub + Cloud Functions. schedulerの構成

Register the following script in Cloud Functions. This script will execute the custom template for you.

from googleapiclient.discovery import build
def main(data, context):
    job = 'my_job'
    dataflow = build('dataflow', 'v1b3')
    request = dataflow.projects().templates().launch(
        projectId='your_project',
        gcsPath='gs://your_bucket/templates/test_template'
    )
    response = request.execute()

Cloud Functions triggers are Pub / Sub. Also, when using Pub / Sub as a trigger, it is necessary to receive two arguments, so it is set as main (data, context).

All you have to do is create a Pub / Sub Topic that is the trigger, and publish that Topic daily from Cloud Scheduler.

If you set up Cloud Composer or a server and schedule it with other workflow engines or cron, you can execute a custom template from the gcloud command below.

gcloud dataflow jobs run my_job \
        --gcs-location gs://your_bucket/templates/test_template \
        --region=asia-northeast1

in conclusion

Cloud Dataflow is very convenient because it would be terrifying to implement a system that can perform such processing on a large scale in a short time. It's a little expensive, so I think it's necessary to choose the usage so that it doesn't cost xx million yen with Cloud Dataflow.

Tomorrow is @ tetsuya0617. looking forward to!

Recommended Posts

Use Cloud Dataflow to dynamically change the destination according to the value of the data and save it in GCS
processing to use notMNIST data in Python (and tried to classify it)
[Python] The role of the asterisk in front of the variable. Divide the input value and assign it to a variable
Sort BigQuery tables according to data in Dataflow
Use Pillow to make the image transparent and overlay only part of it
How to store Python function in Value of dictionary (dict) and call function according to Key
Let's use the open data of "Mamebus" in Python
How to use Decorator in Django and how to make it
[Django 2.2] Sort and get the value of the relation destination
How to change python version of Notebook in Watson Studio (or Cloud Pak for Data)
[C / C ++] Pass the value calculated in C / C ++ to a python function to execute the process, and use that value in C / C ++.
Return the image data with Flask of Python and draw it to the canvas element of HTML
[Python] Change the Cache-Control of the object uploaded to Cloud Storage
Switch the setting value of setting.py according to the development environment
Change the standard output destination to a file in Python
Comparison of how to use higher-order functions in Python 2 and 3
Change the volume of Pepper according to the surrounding environment (sound)
Django Changed to save lots of data in one go
Scraping the list of Go To EAT member stores in Fukuoka prefecture and converting it to CSV
Scraping the list of Go To EAT member stores in Niigata prefecture and converting it to CSV
How to return the data contained in django model in json format and map it on leaflet
How to change the color of just the button pressed in Tkinter
Scraping the schedule of Hinatazaka46 and reflecting it in Google Calendar
Feel free to change the label of the legend in Seaborn in python
I want to use both key and value of Python iterator
I summarized how to change the boot parameters of GRUB and GRUB2
Convert the result of python optparse to dict and utilize it
[Python / Jupyter] Translate the comment of the program copied to the clipboard and insert it in a new cell
Let's use Python to represent the frequency of binary data contained in a data frame in a single bar graph.
[Python] How to save the installed package and install it in a new environment at once Mac environment
Read the data of the NFC reader connected to Raspberry Pi 3 with Python and send it to openFrameworks with OSC
Upload data to s3 of aws with a command and update it, and delete the used data (on the way)
From the introduction of GoogleCloudPlatform Natural Language API to how to use it
Graph of the history of the number of layers of deep learning and the change in accuracy
Predict the amount of electricity used in 2 days and publish it in CSV
Change the saturation and brightness of color specifications like # ff000 in python 2.5
Hit the Rakuten Ranking API to save the ranking of any category in CSV
Convert the spreadsheet to CSV and upload it to Cloud Storage with Cloud Functions
Look up the names and data of free variables in function objects
Start the webcam to take a still image and save it locally
Implemented in Dataflow to copy the hierarchy from Google Drive to Google Cloud Storage
I tried to display the altitude value of DTM in a graph
[python] Send the image captured from the webcam to the server and save it
[Introduction to Data Scientists] Basics of scientific calculation, data processing, and how to use the graph drawing library ♬ Environment construction
The tree.plot_tree of scikit-learn was very easy and convenient, so I tried to summarize how to use it easily.
(Diary 1) How to create, reference, and register data in the SQL database of Microsoft Azure service with python
An engineer who has noticed the emo of cryptography is trying to implement it in Python and defeat it
GAE --With Python, rotate the image based on the rotation information of EXIF and upload it to Cloud Storage.