[PYTHON] Practice of creating a data analysis platform with BigQuery and Cloud DataFlow (data processing)

Practice of creating a data analysis platform with BigQuery and Cloud DataFlow (preparation)

3. Processed using Cloud Dataflow

Last time I wrote that I would use Scio instead of Java and Python. First, I tried using Java and Python for practice. I didn't go to Scio this time.

What i want to do ** Put processed data into BigQuery from BigQuery that contains raw data ** is.

The environment is Python 2.7.13 + Apache Beam 2.5.0 Java 1.8.0_25 + Apache Beam 2.4.0

When I was googled, I had a lot of trouble because a lot of Cloud Dataflow 1 series articles were caught. Even the official document is quite stuck in the 1st system. I think it was best to read Apache Beam with all my might.

Also, the official GCP documentation has different versions in Japanese and English, and Japanese is often old. I think you can be happy with the following Chrome extensions. GCP outdated docs checker

Data processing with python

Official documentation First of all, you can understand the flow by doing the sample wordcount of the official document.

From the raw tweet data that I put in BigQuery last time It decomposes the user information and outputs the user ID, user name, and screen name. The following is the executable program.

The tweet.SiroTalk table is the input and the tweet.SiroTalkPython3 table is the output.

parseuser.py


from __future__ import absolute_import

import argparse
import logging
import re

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

import json
from datetime import datetime


def parse_user(element):
  tweet_id = element['tweet_id']
  ct = element['ct']
  full_text = element['full_text']
  user = element['user']

  user_json = json.loads(user)
  user_id = user_json['id']
  user_screen_name = user_json['screen_name']
  user_name = user_json['name']

  create_time = datetime.fromtimestamp(ct).strftime('%Y-%m-%d %H:%M:%S')

  return {
    'tweet_id': tweet_id,
    'create_time': create_time,
    'full_text': full_text,
    'user_id': user_id,
    'user_screen_name': user_screen_name,
    'user_name': user_name
  }


def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_args.extend([
      # '--runner=DataflowRunner',
      '--runner=DirectRunner',
      '--project=<project-id>',
      '--staging_location=<bucket_path>/staging',
      '--temp_location=<bucket_path>/temp',
      '--job_name=<job_name>',
  ])

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  with beam.Pipeline(options=pipeline_options) as p:
    query = 'SELECT tweet_id, ct, full_text, user FROM tweet.SiroTalk'
    (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='<project_name>', use_standard_sql=False, query=query))
        | 'modify' >> beam.Map(parse_user)
        | 'write' >> beam.io.Write(beam.io.BigQuerySink(
        'tweet.SiroTalkPython',
        schema='tweet_id:INTEGER, create_time:DATETIME, full_text:STRING, user_id:INTEGER, user_screen_name:STRING, user_name:STRING',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
    )


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

Run


$ python parseuser.py

Actually, it is more general to execute with parameters, but this time I hard-coded it for practice.

The data is processed by parse_user (element): element contains one row of the table. Configure the shape you want to output here and return it.

For python, I couldn't find a way to set the partitioned table in the BigQuery output. If you can't do this, the way you spend money when executing queries will change considerably, which is a problem. Also, only python2 is supported.

スクリーンショット 2018-08-05 17.41.07.png

Table after processing スクリーンショット 2018-08-05 17.54.09.png

Data processing with Java

Official documentation

Java is just as easy to do from the sample. In particular, Java cannot be created with just one file like python. It may be easier to start based on the sample first-dataflow.

I modified the sample WordCount.java.

ParseUser.java


package com.example;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.services.bigquery.model.*;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;

import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.json.JSONObject;

public class ParseUser {
    public interface ParseUserPipelineOptions extends GcpOptions {
//        @Description("BigQuery dataset name")
//        @Default.String("tweet")
//        String getDataset();
//        void setDataset(String dataset);
//
//        @Description("BigQuery input table name")
//        @Default.String("SiroTalkTest2")
//        String getInputTable();
//        void setInputTable(String table);

        @Description("BigQuery table schema file")
        @Default.String("schema.json")
        String getSchemaFile();
        void setSchemaFile(String schemaFile);

//        @Description("BigQuery output table name")
//        @Default.String("SiroTalkJava")
//        String getOutputTable();
//        void setOutputTable(String outputTable);
    }

    public static class ParseUserFn extends DoFn<TableRow, TableRow> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            TableRow tweet = c.element();
            Long tweetId = Long.parseLong(tweet.get("tweet_id").toString());
            Long ct = Long.parseLong(tweet.get("ct").toString());
            String fullText = tweet.get("full_text").toString();
            String user = tweet.get("user").toString();

            Instant instant = Instant.ofEpochSecond(ct);
            String tweetDateString = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.ofInstant(instant, ZoneId.of("Asia/Tokyo")));

            JSONObject userJson = new JSONObject(user);
            Long userId = Long.parseLong(userJson.get("id").toString());
            String screenName = userJson.get("screen_name").toString();
            String name = userJson.get("name").toString();

            TableRow outputRow = new TableRow()
                    .set("tweet_id", tweetId)
                    .set("ct", ct)
                    .set("create_time", tweetDateString)
                    .set("full_text", fullText)
                    .set("user_id", userId)
                    .set("user_screen_name", screenName)
                    .set("user_name", name);
            c.output(outputRow);
        }
    }

    public static void main(String[] args) throws IOException {
        final ParseUserPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ParseUserPipelineOptions.class);

        final String tableSchemaJson = new String(Files.readAllBytes(Paths.get(options.getSchemaFile())), Charset.forName("UTF-8"));
        final TableSchema tableSchema = new TableSchema().setFields(new ObjectMapper().reader().forType(new TypeReference<List<TableFieldSchema>>() {}).readValue(tableSchemaJson));

        final Pipeline p = Pipeline.create(options);
        PCollection<TableRow> inputRows = p.apply("ReadFromBQ", BigQueryIO.readTableRows()
                .fromQuery("SELECT tweet_id, ct, full_text, user FROM [<project_name>:tweet.SiroTalk]"));
        PCollection<TableRow> outputRows = inputRows.apply(ParDo.of(new ParseUserFn()));

        outputRows.apply("WriteToBQ", BigQueryIO.writeTableRows()
                .to("<project_name>:tweet.SiroTalkJava")
                .withSchema(tableSchema)
                .withTimePartitioning(new TimePartitioning().setField("ct"))
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_EMPTY));
        p.run().waitUntilFinish();
    }
}

schema.json


[
  {
    "name": "tweet_id",
    "type": "INTEGER"
  },
  {
    "name": "ct",
    "type": "TIMESTAMP"
  },
  {
    "name": "create_time",
    "type": "DATETIME"
  },
  {
    "name": "full_text",
    "type": "STRING"
  },
  {
    "name": "user_id",
    "type": "INTEGER"
  },
  {
    "name": "user_screen_name",
    "type": "STRING"
  },
  {
    "name": "user_name",
    "type": "STRING"
  }
]

Run


mvn compile exec:java \
      -Dexec.mainClass=com.example.ParseUser \
      -Dexec.args="--project=<project-id> \
      --stagingLocation=<bucket_path>/staging/ \
      --runner=DataflowRunner"

The schema of the output table is written in schema.json. It is placed directly under the directory to be executed. Again, if you use ParseUserPipelineOptions properly, you can use the arguments at runtime, but for the time being, hard-coding.

I'm pulling a query similar to python and processing it with ParseUserFn. The way of writing here is unique and I almost imitate it.

スクリーンショット 2018-08-05 17.39.46.png

Table after processing スクリーンショット 2018-08-05 17.53.26.png

BigQuery output partition, but for Java

.Specify using the withTimePartitioning option.


 This is because you can only specify TIMESTAMP type or DATE type
 This time, I put the ct that I put in Long as TIMESTAMP and made it a partition column.
 `` `This is a split table when you put in plain BigQuery. There was a pop called `` ``
 It didn't come out with this way of putting.
 However, the processing capacity at the time of WHERE ct was reduced properly, so it should be okay ...
 Is it the difference between _PARTITIONTIME and column?

 Former table
 <img width="303" alt="スクリーンショット 2018-08-05 17.43.18.png " src="https://qiita-image-store.s3.amazonaws.com/0/265518/ceef5162-f013-a8d4-7048-805f4f3e25b3.png ">
 Post-processing table
 <img width="222" alt="スクリーンショット 2018-08-05 17.43.24.png " src="https://qiita-image-store.s3.amazonaws.com/0/265518/a0ca872d-0e44-9d82-182b-684175475089.png ">



 I did it with Apache Beam version 2.4.0, but when I upgraded it to the latest 2.5.0, it stopped working.
 I haven't investigated it in detail, but I was told that there is no function.

# 4. Make the processed table visible on the dashboard ...
 Next ... maybe I can't do it right away

 This time, I tried simple data processing first.
 There are other ways to use the pipeline, so there are likely to be many things that can be done.
 It was difficult because there was no information ...

 I haven't done anything complicated yet, so I can't say which one is better, python or Java.
 It was easier to do with python.
 So far this time for the time being.


Recommended Posts

Practice of creating a data analysis platform with BigQuery and Cloud DataFlow (data processing)
Get a large amount of Starbucks Twitter data with python and try data analysis Part 1
Practice of data analysis by Python and pandas (Tokyo COVID-19 data edition)
I have 0 years of programming experience and challenge data processing with python
Example of efficient data processing with PANDAS
Python practice data analysis Summary of learning that I hit about 10 with 100 knocks
Execute API of Cloud Pak for Data analysis project Job with environment variables
Build a data analysis environment that links GitHub authentication and Django with JupyterHub
Flow of creating a virtual environment with Anaconda
Data analysis planning collection processing and judgment (Part 1)
A well-prepared record of data analysis in Python
Data analysis planning collection processing and judgment (Part 2)
Image processing with Python (I tried binarizing it into a mosaic art of 0 and 1)
Organizing basic procedures for data analysis and statistical processing (4)
Analysis of financial data by pandas and its visualization (2)
Full-width and half-width processing of CSV data in Python
Run a machine learning pipeline with Cloud Dataflow (Python)
Analysis of financial data by pandas and its visualization (1)
Organizing basic procedures for data analysis and statistical processing (2)
Challenge principal component analysis of text data with Python
Story of image analysis of PDF file and data extraction
Analysis of measurement data ②-Histogram and fitting, lmfit recommendation-
Take a look at profiling and dumping with Dataflow
Overview of natural language processing and its data preprocessing
Create a decision tree from 0 with Python and understand it (3. Data analysis library Pandas edition)
A story of reading a picture book by synthesizing voice with COTOHA API and Cloud Vision API
"Measurement Time Series Analysis of Economic and Finance Data" Solving Chapter End Problems with Python
Try morphological analysis and Markov chains with Django (Ari with a lot of room for improvement)
A story that contributes to new corona analysis using a free trial of Google Cloud Platform
3. Natural language processing with Python 3-4. A year of corona looking back on TF-IDF [Data creation]
Data analysis with Python
Build a data analysis environment with Kedro + MLflow + Github Actions
3. Natural language processing with Python 5-1. Concept of sentiment analysis [AFINN-111]
A network diagram was created with the data of COVID-19.
As a result of mounting and tuning with POH! Lite
Perform isocurrent analysis of open channels with Python and matplotlib
Create applications, register data, and share with a single email
Detect objects of a specific color and size with Python
A collection of methods used when aggregating data with pandas
Send and receive data with MQTT via Watson IoT Platform
A Python beginner first tried a quick and easy analysis of weather data for the last 10 years.