[PYTHON] Pratique de création d'une plateforme d'analyse de données avec BigQuery et Cloud DataFlow (traitement de données)

Pratique de création d'une plateforme d'analyse de données avec BigQuery et Cloud DataFlow (préparation)

3. Traité à l'aide de Cloud Dataflow

La dernière fois que j'ai écrit que j'utiliserais Scio au lieu de Java et Python. Tout d'abord, j'ai essayé d'utiliser Java et Python pour m'entraîner. Je ne suis pas allé à Scio cette fois.

Ce que je veux faire ** Importez les données traitées dans BigQuery à partir de BigQuery contenant des données brutes ** est.

L'environnement est Python 2.7.13 + Apache Beam 2.5.0 Java 1.8.0_25 + Apache Beam 2.4.0

Lorsque j'ai été recherché sur Google, j'ai eu beaucoup de problèmes car de nombreux articles de la série Cloud Dataflow 1 ont été capturés. Même le document officiel est assez coincé dans le 1er système. Je pense qu'il était préférable de lire Apache Beam de toutes mes forces.

De plus, la documentation officielle de GCP a différentes versions en japonais et en anglais, et le japonais est souvent ancien. Je pense que vous pouvez être satisfait des extensions Chrome suivantes. GCP outdated docs checker

Traitement des données avec python

Document officiel Tout d'abord, vous pouvez comprendre le flux en effectuant l'exemple de comptage de mots du document officiel.

À partir des données de tweet d'origine qui ont été placées dans BigQuery la dernière fois Décompose les informations utilisateur et génère l'ID utilisateur, le nom d'utilisateur et le nom d'écran. Voici le programme d'exécution.

La table tweet.SiroTalk est l'entrée et la table tweet.SiroTalkPython3 est la sortie.

parseuser.py


from __future__ import absolute_import

import argparse
import logging
import re

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

import json
from datetime import datetime


def parse_user(element):
  tweet_id = element['tweet_id']
  ct = element['ct']
  full_text = element['full_text']
  user = element['user']

  user_json = json.loads(user)
  user_id = user_json['id']
  user_screen_name = user_json['screen_name']
  user_name = user_json['name']

  create_time = datetime.fromtimestamp(ct).strftime('%Y-%m-%d %H:%M:%S')

  return {
    'tweet_id': tweet_id,
    'create_time': create_time,
    'full_text': full_text,
    'user_id': user_id,
    'user_screen_name': user_screen_name,
    'user_name': user_name
  }


def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_args.extend([
      # '--runner=DataflowRunner',
      '--runner=DirectRunner',
      '--project=<project-id>',
      '--staging_location=<bucket_path>/staging',
      '--temp_location=<bucket_path>/temp',
      '--job_name=<job_name>',
  ])

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  with beam.Pipeline(options=pipeline_options) as p:
    query = 'SELECT tweet_id, ct, full_text, user FROM tweet.SiroTalk'
    (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='<project_name>', use_standard_sql=False, query=query))
        | 'modify' >> beam.Map(parse_user)
        | 'write' >> beam.io.Write(beam.io.BigQuerySink(
        'tweet.SiroTalkPython',
        schema='tweet_id:INTEGER, create_time:DATETIME, full_text:STRING, user_id:INTEGER, user_screen_name:STRING, user_name:STRING',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
    )


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

Courir


$ python parseuser.py

En fait, il est plus général d'exécuter avec des paramètres, mais cette fois je l'ai codé en dur pour la pratique.

Les données sont traitées par parse_user (élément): L'élément contient une ligne du tableau. Configurez la forme que vous souhaitez générer ici et renvoyez-la.

Pour Python, je n'ai pas trouvé de moyen de définir la table partitionnée dans la sortie BigQuery. Si vous ne pouvez pas faire cela, la façon dont vous dépensez de l'argent lors de l'exécution des requêtes changera considérablement, ce qui pose un problème. De plus, seul python2 est pris en charge.

スクリーンショット 2018-08-05 17.41.07.png

Table traitée スクリーンショット 2018-08-05 17.54.09.png

Traitement des données avec Java

Document officiel

Java est tout aussi facile à faire à partir de l'exemple. En particulier, Java ne peut pas être créé avec un seul fichier comme python. Il peut être plus facile de démarrer en fonction de l'exemple de premier flux de données.

J'ai modifié l'exemple WordCount.java.

ParseUser.java


package com.example;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.services.bigquery.model.*;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;

import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.json.JSONObject;

public class ParseUser {
    public interface ParseUserPipelineOptions extends GcpOptions {
//        @Description("BigQuery dataset name")
//        @Default.String("tweet")
//        String getDataset();
//        void setDataset(String dataset);
//
//        @Description("BigQuery input table name")
//        @Default.String("SiroTalkTest2")
//        String getInputTable();
//        void setInputTable(String table);

        @Description("BigQuery table schema file")
        @Default.String("schema.json")
        String getSchemaFile();
        void setSchemaFile(String schemaFile);

//        @Description("BigQuery output table name")
//        @Default.String("SiroTalkJava")
//        String getOutputTable();
//        void setOutputTable(String outputTable);
    }

    public static class ParseUserFn extends DoFn<TableRow, TableRow> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            TableRow tweet = c.element();
            Long tweetId = Long.parseLong(tweet.get("tweet_id").toString());
            Long ct = Long.parseLong(tweet.get("ct").toString());
            String fullText = tweet.get("full_text").toString();
            String user = tweet.get("user").toString();

            Instant instant = Instant.ofEpochSecond(ct);
            String tweetDateString = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.ofInstant(instant, ZoneId.of("Asia/Tokyo")));

            JSONObject userJson = new JSONObject(user);
            Long userId = Long.parseLong(userJson.get("id").toString());
            String screenName = userJson.get("screen_name").toString();
            String name = userJson.get("name").toString();

            TableRow outputRow = new TableRow()
                    .set("tweet_id", tweetId)
                    .set("ct", ct)
                    .set("create_time", tweetDateString)
                    .set("full_text", fullText)
                    .set("user_id", userId)
                    .set("user_screen_name", screenName)
                    .set("user_name", name);
            c.output(outputRow);
        }
    }

    public static void main(String[] args) throws IOException {
        final ParseUserPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ParseUserPipelineOptions.class);

        final String tableSchemaJson = new String(Files.readAllBytes(Paths.get(options.getSchemaFile())), Charset.forName("UTF-8"));
        final TableSchema tableSchema = new TableSchema().setFields(new ObjectMapper().reader().forType(new TypeReference<List<TableFieldSchema>>() {}).readValue(tableSchemaJson));

        final Pipeline p = Pipeline.create(options);
        PCollection<TableRow> inputRows = p.apply("ReadFromBQ", BigQueryIO.readTableRows()
                .fromQuery("SELECT tweet_id, ct, full_text, user FROM [<project_name>:tweet.SiroTalk]"));
        PCollection<TableRow> outputRows = inputRows.apply(ParDo.of(new ParseUserFn()));

        outputRows.apply("WriteToBQ", BigQueryIO.writeTableRows()
                .to("<project_name>:tweet.SiroTalkJava")
                .withSchema(tableSchema)
                .withTimePartitioning(new TimePartitioning().setField("ct"))
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_EMPTY));
        p.run().waitUntilFinish();
    }
}

schema.json


[
  {
    "name": "tweet_id",
    "type": "INTEGER"
  },
  {
    "name": "ct",
    "type": "TIMESTAMP"
  },
  {
    "name": "create_time",
    "type": "DATETIME"
  },
  {
    "name": "full_text",
    "type": "STRING"
  },
  {
    "name": "user_id",
    "type": "INTEGER"
  },
  {
    "name": "user_screen_name",
    "type": "STRING"
  },
  {
    "name": "user_name",
    "type": "STRING"
  }
]

Courir


mvn compile exec:java \
      -Dexec.mainClass=com.example.ParseUser \
      -Dexec.args="--project=<project-id> \
      --stagingLocation=<bucket_path>/staging/ \
      --runner=DataflowRunner"

Le schéma de la table de sortie est écrit dans schema.json. Il est placé directement sous le répertoire à exécuter. Encore une fois, si vous utilisez correctement ParseUserPipelineOptions, vous pouvez utiliser les arguments au moment de l'exécution, mais pour le moment, le codage en dur.

Je tire une requête similaire à python et la traite avec ParseUserFn. La manière d'écrire ici est unique et je l'imite presque.

スクリーンショット 2018-08-05 17.39.46.png

Table traitée スクリーンショット 2018-08-05 17.53.26.png

Partition de sortie BigQuery, mais pour Java

.Spécifiez à l'aide de l'option withTimePartitioning.


 En effet, vous ne pouvez spécifier que le type TIMESTAMP ou le type DATE
 Cette fois, j'ai mis le ct que j'ai mis dans Long comme TIMESTAMP et en ai fait une colonne de partition.
 `` Il s'agit d'une table fractionnée lorsque vous insérez un BigQuery simple. Il y avait un pop appelé `` ''
 Il n'est pas venu avec cette façon de dire.
 Cependant, la capacité de traitement au moment de WHERE ct a été réduite correctement, donc ça devrait aller ...
 Est-ce la différence entre _PARTITIONTIME et column?

 Ancien tableau
 <img width="303" alt="スクリーンショット 2018-08-05 17.43.18.png " src="https://qiita-image-store.s3.amazonaws.com/0/265518/ceef5162-f013-a8d4-7048-805f4f3e25b3.png ">
 Table de post-traitement
 <img width="222" alt="スクリーンショット 2018-08-05 17.43.24.png " src="https://qiita-image-store.s3.amazonaws.com/0/265518/a0ca872d-0e44-9d82-182b-684175475089.png ">



 Je l'ai fait avec Apache Beam version 2.4.0, mais lorsque je l'ai mis à niveau vers la dernière version 2.5.0, il a cessé de fonctionner.
 Je ne l'ai pas étudié en détail, mais on m'a dit qu'il n'y avait pas de fonction.

# 4. Rendre la table traitée visible sur le tableau de bord ...
 Ensuite ... peut-être que je ne peux pas le faire tout de suite

 Cette fois, j'ai d'abord essayé le simple traitement des données.
 Il existe d'autres façons d'utiliser le pipeline, de sorte qu'il y aura probablement beaucoup de choses à faire.
 C'était difficile car il n'y avait aucune information ...

 Je n'ai encore rien fait de compliqué, donc je ne peux pas dire lequel est le meilleur, python ou Java.
 python était plus facile à faire.
 Jusqu'ici cette fois pour le moment.


Recommended Posts

Pratique de création d'une plateforme d'analyse de données avec BigQuery et Cloud DataFlow (traitement de données)
Obtenez une grande quantité de données Twitter de Starba avec python et essayez l'analyse de données Partie 1
Pratique de l'analyse de données par Python et pandas (Tokyo COVID-19 data edition)
J'ai 0 ans d'expérience en programmation et je défie le traitement des données avec python
Exemple de traitement efficace des données avec PANDAS
Analyse des données de pratique Python Résumé de l'apprentissage que j'ai atteint environ 10 avec 100 coups
Exécuter l'API de Cloud Pak for Data Analysis Project Job avec des variables d'environnement
Créer un environnement d'analyse de données qui relie l'authentification GitHub et Django avec JupyterHub
Flux de création d'un environnement virtuel avec Anaconda
Traitement et jugement de la collecte du plan d'analyse des données (partie 1)
Environnement enregistré pour l'analyse des données avec Python
Traitement et jugement de la collecte du plan d'analyse des données (partie 2)
Traitement d'image avec Python (j'ai essayé de le binariser en art mosaïque 0 et 1)
Organisation des procédures de base pour l'analyse des données et le traitement statistique (4)
Analyse des données financières par pandas et leur visualisation (2)
Traitement pleine largeur et demi-largeur des données CSV en Python
Exécutez un pipeline de machine learning avec Cloud Dataflow (Python)
Analyse des données financières par pandas et leur visualisation (1)
Organisation des procédures de base pour l'analyse des données et le traitement statistique (2)
Défiez l'analyse des composants principaux des données textuelles avec Python
Histoire de l'analyse d'image du fichier PDF et de l'extraction de données
Analyse des données de mesure (2) -Hydrobacter et raccord, recommandation lmfit-
Jetez un œil au profilage et au vidage avec Dataflow
Vue d'ensemble du traitement du langage naturel et de son prétraitement des données
Créez un arbre de décision à partir de zéro avec Python et comprenez-le (3. Bibliothèque d'analyse de données édition Pandas)
Une histoire de lecture d'un livre d'images en synthétisant la voix avec l'API COTOHA et l'API Cloud Vision
"Analyse des séries chronologiques de mesure des données économiques et financières" Résolution du problème de fin de chapitre avec Python
Essayez l'analyse morphologique et la chaîne de Markov avec Django (Ari avec beaucoup de marge d'amélioration)
Une histoire qui contribue à une nouvelle analyse corona à l'aide d'un essai gratuit de Google Cloud Platform
Analyse de données avec Python
Créez un environnement d'analyse de données avec Kedro + MLflow + Github Actions
Un diagramme de réseau a été créé avec les données du COVID-19.
À la suite du montage et du réglage avec POH! Lite
Effectuer une analyse isocurrent des canaux en eau libre avec Python et matplotlib
Créez des applications, enregistrez des données et partagez-les avec un seul e-mail
Détecter les objets d'une couleur et d'une taille spécifiques avec Python
Une collection de méthodes utilisées lors de l'agrégation de données avec des pandas
Envoyez et recevez des données avec MQTT via Watson IoT Platform
Un débutant en Python a d'abord essayé une analyse rapide et facile des données météorologiques des 10 dernières années.