[PYTHON] Praxis der Erstellung einer Datenanalyseplattform mit BigQuery und Cloud DataFlow (Datenverarbeitung)

Übung zum Erstellen einer Datenanalyseplattform mit BigQuery und Cloud DataFlow (Vorbereitung)

3. Verarbeitet mit Cloud Dataflow

Das letzte Mal habe ich geschrieben, dass ich Scio anstelle von Java und Python verwenden würde. Zuerst habe ich versucht, Java und Python zum Üben zu verwenden. Diesmal bin ich nicht zu Scio gegangen.

Was ich machen will; was ich vorhabe zu tun ** Verarbeitete Daten aus BigQuery, die Rohdaten enthalten, in BigQuery einfügen ** ist.

Die Umwelt ist Python 2.7.13 + Apache Beam 2.5.0 Java 1.8.0_25 + Apache Beam 2.4.0

Als ich gegoogelt wurde, hatte ich große Probleme, weil viele Artikel der Cloud Dataflow 1-Serie abgefangen wurden. Sogar das offizielle Dokument steckt ziemlich fest im 1. System.

GCP outdated docs checker

Außerdem enthält die offizielle GCP-Dokumentation verschiedene Versionen in Japanisch und Englisch, und Japanisch ist häufig alt. Ich denke, Sie werden sich freuen, wenn Sie die folgenden Chrome-Erweiterungen hinzufügen. Datenverarbeitung mit Python Offizielles Dokument Zunächst können Sie den Ablauf anhand der Beispielwortanzahl des offiziellen Dokuments verstehen. Aus den ursprünglichen Tweet-Daten, die beim letzten Mal in BigQuery gespeichert wurden Zerlegt Benutzerinformationen und gibt Benutzer-ID, Benutzernamen und Bildschirmnamen aus. Das folgende ist das Ausführungsprogramm.

Die tweet.SiroTalk-Tabelle ist die Eingabe und die tweet.SiroTalkPython3-Tabelle ist die Ausgabe.

parseuser.py


from __future__ import absolute_import

import argparse
import logging
import re

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

import json
from datetime import datetime


def parse_user(element):
  tweet_id = element['tweet_id']
  ct = element['ct']
  full_text = element['full_text']
  user = element['user']

  user_json = json.loads(user)
  user_id = user_json['id']
  user_screen_name = user_json['screen_name']
  user_name = user_json['name']

  create_time = datetime.fromtimestamp(ct).strftime('%Y-%m-%d %H:%M:%S')

  return {
    'tweet_id': tweet_id,
    'create_time': create_time,
    'full_text': full_text,
    'user_id': user_id,
    'user_screen_name': user_screen_name,
    'user_name': user_name
  }


def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_args.extend([
      # '--runner=DataflowRunner',
      '--runner=DirectRunner',
      '--project=<project-id>',
      '--staging_location=<bucket_path>/staging',
      '--temp_location=<bucket_path>/temp',
      '--job_name=<job_name>',
  ])

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  with beam.Pipeline(options=pipeline_options) as p:
    query = 'SELECT tweet_id, ct, full_text, user FROM tweet.SiroTalk'
    (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='<project_name>', use_standard_sql=False, query=query))
        | 'modify' >> beam.Map(parse_user)
        | 'write' >> beam.io.Write(beam.io.BigQuerySink(
        'tweet.SiroTalkPython',
        schema='tweet_id:INTEGER, create_time:DATETIME, full_text:STRING, user_id:INTEGER, user_screen_name:STRING, user_name:STRING',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
    )


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

Lauf


$ python parseuser.py

Eigentlich ist es allgemeiner, mit Parametern auszuführen, aber dieses Mal habe ich es für die Praxis fest codiert.

Die Daten werden von parse_user (Element) verarbeitet: Element enthält eine Zeile der Tabelle. Konfigurieren Sie die Form, die Sie hier ausgeben möchten, und geben Sie sie zurück.

Für Python konnte ich keine Möglichkeit finden, die partitionierte Tabelle in der BigQuery-Ausgabe festzulegen. Wenn Sie dies nicht tun können, ändert sich die Art und Weise, wie Sie Geld für die Ausführung von Abfragen ausgeben, erheblich, was ein Problem darstellt. Außerdem wird nur Python2 unterstützt.

スクリーンショット 2018-08-05 17.41.07.png

Verarbeitete Tabelle スクリーンショット 2018-08-05 17.54.09.png

Datenverarbeitung mit Java

Offizielles Dokument

Java ist aus dem Beispiel genauso einfach zu machen. Insbesondere kann Java nicht mit nur einer Datei wie Python erstellt werden. Es kann einfacher sein, basierend auf dem ersten Datenfluss des Beispiels zu starten.

Ich habe das Beispiel WordCount.java geändert.

ParseUser.java


package com.example;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.services.bigquery.model.*;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;

import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.json.JSONObject;

public class ParseUser {
    public interface ParseUserPipelineOptions extends GcpOptions {
//        @Description("BigQuery dataset name")
//        @Default.String("tweet")
//        String getDataset();
//        void setDataset(String dataset);
//
//        @Description("BigQuery input table name")
//        @Default.String("SiroTalkTest2")
//        String getInputTable();
//        void setInputTable(String table);

        @Description("BigQuery table schema file")
        @Default.String("schema.json")
        String getSchemaFile();
        void setSchemaFile(String schemaFile);

//        @Description("BigQuery output table name")
//        @Default.String("SiroTalkJava")
//        String getOutputTable();
//        void setOutputTable(String outputTable);
    }

    public static class ParseUserFn extends DoFn<TableRow, TableRow> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            TableRow tweet = c.element();
            Long tweetId = Long.parseLong(tweet.get("tweet_id").toString());
            Long ct = Long.parseLong(tweet.get("ct").toString());
            String fullText = tweet.get("full_text").toString();
            String user = tweet.get("user").toString();

            Instant instant = Instant.ofEpochSecond(ct);
            String tweetDateString = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.ofInstant(instant, ZoneId.of("Asia/Tokyo")));

            JSONObject userJson = new JSONObject(user);
            Long userId = Long.parseLong(userJson.get("id").toString());
            String screenName = userJson.get("screen_name").toString();
            String name = userJson.get("name").toString();

            TableRow outputRow = new TableRow()
                    .set("tweet_id", tweetId)
                    .set("ct", ct)
                    .set("create_time", tweetDateString)
                    .set("full_text", fullText)
                    .set("user_id", userId)
                    .set("user_screen_name", screenName)
                    .set("user_name", name);
            c.output(outputRow);
        }
    }

    public static void main(String[] args) throws IOException {
        final ParseUserPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ParseUserPipelineOptions.class);

        final String tableSchemaJson = new String(Files.readAllBytes(Paths.get(options.getSchemaFile())), Charset.forName("UTF-8"));
        final TableSchema tableSchema = new TableSchema().setFields(new ObjectMapper().reader().forType(new TypeReference<List<TableFieldSchema>>() {}).readValue(tableSchemaJson));

        final Pipeline p = Pipeline.create(options);
        PCollection<TableRow> inputRows = p.apply("ReadFromBQ", BigQueryIO.readTableRows()
                .fromQuery("SELECT tweet_id, ct, full_text, user FROM [<project_name>:tweet.SiroTalk]"));
        PCollection<TableRow> outputRows = inputRows.apply(ParDo.of(new ParseUserFn()));

        outputRows.apply("WriteToBQ", BigQueryIO.writeTableRows()
                .to("<project_name>:tweet.SiroTalkJava")
                .withSchema(tableSchema)
                .withTimePartitioning(new TimePartitioning().setField("ct"))
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_EMPTY));
        p.run().waitUntilFinish();
    }
}

schema.json


[
  {
    "name": "tweet_id",
    "type": "INTEGER"
  },
  {
    "name": "ct",
    "type": "TIMESTAMP"
  },
  {
    "name": "create_time",
    "type": "DATETIME"
  },
  {
    "name": "full_text",
    "type": "STRING"
  },
  {
    "name": "user_id",
    "type": "INTEGER"
  },
  {
    "name": "user_screen_name",
    "type": "STRING"
  },
  {
    "name": "user_name",
    "type": "STRING"
  }
]

Lauf


mvn compile exec:java \
      -Dexec.mainClass=com.example.ParseUser \
      -Dexec.args="--project=<project-id> \
      --stagingLocation=<bucket_path>/staging/ \
      --runner=DataflowRunner"

Das Schema der Ausgabetabelle ist in schema.json geschrieben. Es wird direkt unter dem auszuführenden Verzeichnis abgelegt. Wenn Sie ParseUserPipelineOptions ordnungsgemäß verwenden, können Sie die Argumente zur Laufzeit verwenden, vorerst jedoch hart codieren.

Ich ziehe eine Abfrage ähnlich wie Python und verarbeite sie mit ParseUserFn. Die Art zu schreiben hier ist einzigartig und ich ahme es fast nach.

スクリーンショット 2018-08-05 17.39.46.png

Verarbeitete Tabelle スクリーンショット 2018-08-05 17.53.26.png

BigQuery-Ausgabepartition, jedoch für Java

.Geben Sie die Option withTimePartitioning an.


 Dies liegt daran, dass Sie nur den TIMESTAMP-Typ oder den DATE-Typ angeben können
 Dieses Mal habe ich das CT, das ich in Long als TIMESTAMP eingegeben habe, eingefügt und es zu einer Partitionsspalte gemacht.
 `` `Dies ist eine geteilte Tabelle, wenn Sie eine einfache BigQuery eingeben. Es gab einen Pop namens `` ``
 Es kam nicht mit dieser Art des Puttens heraus.
 Die Verarbeitungskapazität zum Zeitpunkt von WHERE ct wurde jedoch ordnungsgemäß reduziert, sodass es in Ordnung sein sollte ...
 Ist es der Unterschied zwischen _PARTITIONTIME und Spalte?

 Ehemaliger Tisch
 <img width="303" alt="スクリーンショット 2018-08-05 17.43.18.png " src="https://qiita-image-store.s3.amazonaws.com/0/265518/ceef5162-f013-a8d4-7048-805f4f3e25b3.png ">
 Nachbearbeitungstabelle
 <img width="222" alt="スクリーンショット 2018-08-05 17.43.24.png " src="https://qiita-image-store.s3.amazonaws.com/0/265518/a0ca872d-0e44-9d82-182b-684175475089.png ">



 Ich habe es mit Apache Beam Version 2.4.0 gemacht, aber als ich es auf die neueste Version 2.5.0 aktualisiert habe, hat es nicht mehr funktioniert.
 Ich habe es nicht im Detail untersucht, aber mir wurde gesagt, dass es keine Funktion gibt.

# 4. Machen Sie die verarbeitete Tabelle im Dashboard sichtbar ...
 Weiter ... vielleicht kann ich es nicht sofort tun

 Dieses Mal habe ich zuerst eine einfache Datenverarbeitung versucht.
 Es gibt andere Möglichkeiten, die Pipeline zu verwenden, sodass wahrscheinlich viele Dinge getan werden können.
 Es war schwierig, weil es keine Informationen gab ...

 Ich habe noch nichts Kompliziertes gemacht, daher kann ich nicht sagen, welches besser ist, Python oder Java.
 Python war einfacher zu machen.
 Bisher diesmal vorerst.


Recommended Posts

Praxis der Erstellung einer Datenanalyseplattform mit BigQuery und Cloud DataFlow (Datenverarbeitung)
Holen Sie sich mit Python eine große Menge von Starbas Twitter-Daten und probieren Sie die Datenanalyse Teil 1 aus
Praxis der Datenanalyse durch Python und Pandas (Tokyo COVID-19 Data Edition)
Ich habe 0 Jahre Programmiererfahrung und fordere die Datenverarbeitung mit Python heraus
Beispiel für eine effiziente Datenverarbeitung mit PANDAS
Python-Übungsdatenanalyse Zusammenfassung des Lernens, dass ich ungefähr 10 mit 100 Schlägen getroffen habe
Führen Sie die API des Cloud Pak für Datenanalyseprojekts Job mit Umgebungsvariablen aus
Erstellen Sie eine Datenanalyseumgebung, die die GitHub-Authentifizierung und Django mit JupyterHub verbindet
Ablauf beim Erstellen einer virtuellen Umgebung mit Anaconda
Verarbeitung und Beurteilung des Datenanalyseplans (Teil 1)
Aufgezeichnete Umgebung für die Datenanalyse mit Python
Verarbeitung und Beurteilung des Datenanalyseplans (Teil 2)
Bildverarbeitung mit Python (ich habe versucht, es in 0 und 1 Mosaikkunst zu binarisieren)
Organisation grundlegender Verfahren zur Datenanalyse und statistischen Verarbeitung (4)
Analyse von Finanzdaten durch Pandas und deren Visualisierung (2)
Verarbeitung von CSV-Daten in voller und halber Breite in Python
Führen Sie eine Pipeline für maschinelles Lernen mit Cloud Dataflow (Python) aus.
Analyse von Finanzdaten durch Pandas und deren Visualisierung (1)
Organisation grundlegender Verfahren zur Datenanalyse und statistischen Verarbeitung (2)
Fordern Sie die Hauptkomponentenanalyse von Textdaten mit Python heraus
Geschichte der Bildanalyse von PDF-Dateien und Datenextraktion
Analyse der Messdaten (2) -Hydrobacter und Anpassung, lmfit Empfehlung-
Sehen Sie sich das Profiling und Dumping mit Dataflow an
Überblick über die Verarbeitung natürlicher Sprache und ihre Datenvorverarbeitung
Erstellen Sie mit Python einen Entscheidungsbaum aus 0 und verstehen Sie ihn (3. Datenanalysebibliothek Pandas Edition)
Eine Geschichte über das Lesen eines Bilderbuchs durch Synthetisieren von Sprache mit COTOHA API und Cloud Vision API
"Zeitreihenanalyse von Wirtschafts- und Finanzdaten messen" Das Problem am Ende des Kapitels mit Python lösen
Versuchen Sie es mit morphologischer Analyse und Markov-Kette mit Django (Ari mit viel Raum für Verbesserungen)
Eine Geschichte, die mithilfe einer kostenlosen Testversion der Google Cloud Platform zu einer neuen Koronaanalyse beiträgt
Datenanalyse mit Python
Erstellen Sie eine Datenanalyseumgebung mit Kedro + MLflow + Github-Aktionen
Mit den Daten von COVID-19 wurde ein Netzwerkdiagramm erstellt.
Als Ergebnis der Montage und Abstimmung mit POH! Lite
Führen Sie mit Python und Matplotlib eine Isostromanalyse offener Wasserkanäle durch
Erstellen Sie Anwendungen, registrieren Sie Daten und teilen Sie sie mit einer einzigen E-Mail
Erkennen Sie mit Python Objekte einer bestimmten Farbe und Größe
Eine Sammlung von Methoden, die beim Aggregieren von Daten mit Pandas verwendet werden
Senden und Empfangen von Daten mit MQTT über die Watson IoT Platform
Ein Python-Anfänger hat in den letzten 10 Jahren zunächst versucht, die Wetterdaten schnell und einfach zu analysieren.