Build a data processing environment with Google Cloud DataFlow + Pub / Sub

An introductory summary that I took a quick look at when using Cloud DataFlow at work. I'm not an expert in the data processing field, so I made some mistakes in the description. If you can learn it, please let me know :)

What is Cloud Data Flow?

A fully managed service of the stream / batch data processing engine (Apache Beam) provided by Google Cloud Platform.

What is this? Because it feels like If you roughly decompose the elements

Feeling (*) As of 2018/02, stream support is Java only

Using the same GCP service Cloud Pub / Sub as a message bus, By combining with data stores such as CloudIoTCore (MQTT Protocol Bridge), GCS and BigQuery, it is possible to receive, convert and integrate data transmission from terminals, for example.

A rough usage model seen from Google's sample images.

diagram-dataflow-2x.png

Cloud DataFlow practice

It is difficult to understand unless you actually use it, so this time we will build it using this flow as an example.

Untitled.png

Since I wanted to write the cooperation around PubSub, I set the entrance to CloudPubSub, but of course it is possible to process the data of table A and table B in BigQuery and load it to table C.

How to create a development environment

Download the SDK with the following in pom.xml.

    <dependency>
      <groupId>com.google.cloud.dataflow</groupId>
      <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
      <version>[2.2.0, 2.99)</version>
    </dependency>

If you have maven or something like that, check Official for more information. Please.

Write code for behavior settings

First, set the options of the pipeline to process.

        DataflowPipelineOptions options = PipelineOptionsFactory.create()
                .as(DataflowPipelineOptions.class);
        //Specify the project name you are using
        options.setProject("your gcp porject");
        //Create and specify GCS Bucket used by Dataflow for staging
        options.setStagingLocation("gs://hoge/staging");
        //Create and specify a GCS Bucket that Dataflow temporarily uses
        options.setTempLocation("gs://hoge/tmp");
        //Specify the runner to execute. Specify DataflowRunner when executing on GCP. DirectRunner for local execution
        options.setRunner(DataflowRunner.class);
        //Enable streaming
        options.setStreaming(true);
        //Specify the name at the time of operation(Jobs with the same name cannot run at the same time
        options.setJobName("sample");

Official also has an explanation, so only a rough point.

(*) In the above example, I wanted to separate the setting from the code, so I use Java's ResourceBundle, but if I just make it work, it will work with a direct value.

Basic concepts that make up programming on Dataflow

Basically the following for programming Dataflow jobs You will be dealing with the concept.

Basically, apply the conversion process you wrote to PipeLine and the necessary processing such as PipeLine I / O. You will be building a job.

Write the code from reading input to output

Three actions described in the above figure

Will be applied to the pipeline.

        //Pipeline (job to process))Create object
        Pipeline p = Pipeline.create(options);
        TableSchema schema = SampleSchemaFactory.create();
        //Apply the processing content
        //Read data from pubsub subscription
        p.apply(PubsubIO.readStrings().fromSubscription("your pubsub subscription"))
        //Specify windows every 5 minutes(Not required)
                .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))))
        //Set conversions for input from pubsub(Implementation will be described later)
                .apply(ParDo.of(new BigQueryRowConverter()))
        //Set up writing to BigQuery
                .apply("WriteToBQ", BigQueryIO.writeTableRows()
                        //Specify the write destination table name
                        .to(TableDestination("dataset_name:table_name","description"))
                        //Define the schema of the write destination with Object and pass it
                        .withSchema(schema)
                        //Create if there is no table(option)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        //Insert data at the end of the table (optional))
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        //Run
        p.run();

In the case of conversion using ParDo, it inherits DoFn and implements data retrieval ~ conversion to PCollection in processElement which is an abstract method.

package com.mycompany.dataflow_sample.converter;

import com.google.api.services.bigquery.model.TableRow;
import com.google.gson.Gson;
import com.mycompany.dataflow_sample.entity.SampleInputJson;
import org.apache.beam.sdk.transforms.DoFn;

public class BigQueryRowConverter extends DoFn<String,TableRow> {

    @ProcessElement
    public void processElement(ProcessContext dofn) throws Exception {
      //Receive input
      String json = dofn.element();
      Gson gson = new Gson();
      //Convert json to object
      SampleInputJson jsonObj = gson.fromJson(json,SampleInputJson.class);
      //Convert the contents of json to tableRow of bigquery
      TableRow output = new TableRow();
      TableRow attributesOutput = new TableRow();
      TableRow attr2Output = new TableRow();
      //Set data in output
      attributesOutput.set("attr1", jsonObj.attributes.attr1);
      attributesOutput.set("attr2", jsonObj.attributes.attr2);
      attr2Output.set("attr2_prop1",jsonObj.attributes.attr2.prop1);
      attr2Output.set("attr2_prop2",jsonObj.attributes.attr2.prop2);
      
      attributesOutput .set("attr2",attr2Output);
      output.set("attributes", attributesOutput );
      output.set("name", jsonObj.name);
      output.set("ts", jsonObj.timeStamp/1000);
      //Output
      dofn.output(output);
    }
}
package com.mycompany.dataflow_sample.schema;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.ArrayList;
import java.util.List;

public class SampleSchemaFactory {
    public static TableSchema create() {
        List<TableFieldSchema> fields;
        fields = new ArrayList<> ();
        fields.add(new TableFieldSchema().setName("name").setType("STRING"));
        fields.add(new TableFieldSchema().setName("ts").setType("TIMESTAMP"));
        fields.add(new TableFieldSchema().setName("attributes").setType("RECORD")
                .setFields(new ArrayList<TableFieldSchema>() {
                    {
                        add(new TableFieldSchema().setName("attr1").setType("STRING"));
                        add(new TableFieldSchema().setName("attr2").setType("RECORD")
                            .setFields(new ArrayList<TableFieldSchema>() {
                                {
                                    add(new TableFieldSchema().setName("prop1").setType("INTEGER"));
                                    add(new TableFieldSchema().setName("prop2").setType("STRING"));
                                }
                            })
                        );
                    }
                })
        );
        TableSchema schema = new TableSchema().setFields(fields);

        return schema;
    }
}

Like this.

Deploy / test

Just build and run the Java code you wrote earlier and the job will actually be deployed on GCP.

dataflow.png

Send a message from the GCP console.

pubsub.png

Operation check is OK if data is inserted in BigQuery :)

Check the log

You can check the log from the details screen by clicking the created job on the console. Logs are automatically transferred to stackdriver, so you should use that for monitoring and monitoring.

detail.png

Behavior at the time of error (exception occurrence)

If an Exception occurs while executing DataflowJob, ACK will not be sent to PubSub, so the data will be retrieved again.

Therefore

--PubSub messages are not discarded even if processing is lost in the load part to BigQuery, so they are automatically retried (no need to consider retrying at the communication level). ――On the other hand, if the data is incorrect, it will continue to spit out errors, so it is necessary to perform proper handling such as moving to error data, logging, and discarding by applying validation.

sample

The sample is here. I wanted to partition it, so I made it to create a daily table based on the daily at the time of execution.

reference

Official sample code collection (github) apache beam sdk google official

Recommended Posts

Build a data processing environment with Google Cloud DataFlow + Pub / Sub
Build a Node.js environment with Docker
Build a Tomcat 8.5 environment with Pleiades 4.8
Build a PureScript development environment with Docker
Build a Wordpress development environment with Docker
Build a Laravel / Docker environment with VSCode devcontainer
Build a WordPress development environment quickly with Docker
[Win10] Build a JSF development environment with NetBeans
Build a Java development environment with VS Code
Build a Ruby on Rails development environment on AWS Cloud9
Easily build a Vue.js environment with Docker + Vue CLI
[Note] Build a Python3 environment with Docker in EC2
Build a Maven in-house repository on Google Cloud Storage
[Environment construction] Build a Java development environment with VS Code!
[Oracle Cloud] Build a 4-Node RAC environment of Oracle Database 19c with Docker on OCI Compute
Build a "Spring Thorough Introduction" development environment with IntelliJ IDEA
Build a CentOS 8 virtual environment on your Mac with VirtualBox
Build a Node-RED environment with Docker to move and understand
Build docker environment with WSL
Ruby ① Build a Windows environment
Build apache7.4 + mysql8 environment with Docker (with initial data) (your own memo)
(For myself) Try creating a C # environment with docker + code-server, cloud9
Build a development environment for Django + MySQL + nginx with Docker Compose
Steps to build a Ruby on Rails development environment with Vagrant
Create a Vue3 environment with Docker!
Build Couchbase local environment with Docker
Build a Java project with Gradle
Build PlantUML environment with VSCode + Docker
Build environment with vue.js + rails + docker
Build Rails environment with Docker Compose
Build a web application with Javalin
Use Java 11 with Google Cloud Functions
[Google Cloud] Getting Started with Docker
Build a XAMPP environment on Ubuntu
Google Cloud Platform with Spring Boot 2.0.0
Build jooby development environment with Eclipse
Cloud Dataflow template created with Scala
Build docker + laravel environment with laradock
Build a Windows application test environment with Selenium Grid, Appium, and Windows Application Driver
Build a hot reload development environment with Docker-compose using Realize of Go
I tried to build a Firebase application development environment with Docker in 2020
[Copy and paste] Build a Laravel development environment with Docker Compose Part 2
How to build a Ruby on Rails development environment with Docker (Rails 6.x)
Build a local development environment for Rails tutorials with Docker (Rails 6 + PostgreSQL + Webpack)
[Copy and paste] Build a Laravel development environment with Docker Compose Participation
Build a bulletin board API with authentication and authorization with Rails 6 # 1 Environment construction
Build a development environment on AWS EC2 with CentOS7 + Nginx + pm2 + Nuxt.js
How to build a Ruby on Rails development environment with Docker (Rails 5.x)
Template: Build a Ruby / Rails development environment with a Docker container (Ubuntu version)
Template: Build a Ruby / Rails development environment with a Docker container (Mac version)