Übung zum Erstellen einer Datenanalyseplattform mit BigQuery und Cloud DataFlow (Vorbereitung)
Das letzte Mal habe ich geschrieben, dass ich Scio anstelle von Java und Python verwenden würde. Zuerst habe ich versucht, Java und Python zum Üben zu verwenden. Diesmal bin ich nicht zu Scio gegangen.
Was ich machen will; was ich vorhabe zu tun ** Verarbeitete Daten aus BigQuery, die Rohdaten enthalten, in BigQuery einfügen ** ist.
Die Umwelt ist Python 2.7.13 + Apache Beam 2.5.0 Java 1.8.0_25 + Apache Beam 2.4.0
Als ich gegoogelt wurde, hatte ich große Probleme, weil viele Artikel der Cloud Dataflow 1-Serie abgefangen wurden. Sogar das offizielle Dokument steckt ziemlich fest im 1. System.
Außerdem enthält die offizielle GCP-Dokumentation verschiedene Versionen in Japanisch und Englisch, und Japanisch ist häufig alt. Ich denke, Sie werden sich freuen, wenn Sie die folgenden Chrome-Erweiterungen hinzufügen. Datenverarbeitung mit Python Offizielles Dokument Zunächst können Sie den Ablauf anhand der Beispielwortanzahl des offiziellen Dokuments verstehen. Aus den ursprünglichen Tweet-Daten, die beim letzten Mal in BigQuery gespeichert wurden Zerlegt Benutzerinformationen und gibt Benutzer-ID, Benutzernamen und Bildschirmnamen aus. Das folgende ist das Ausführungsprogramm.
Die tweet.SiroTalk-Tabelle ist die Eingabe und die tweet.SiroTalkPython3-Tabelle ist die Ausgabe.
parseuser.py
from __future__ import absolute_import
import argparse
import logging
import re
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import json
from datetime import datetime
def parse_user(element):
tweet_id = element['tweet_id']
ct = element['ct']
full_text = element['full_text']
user = element['user']
user_json = json.loads(user)
user_id = user_json['id']
user_screen_name = user_json['screen_name']
user_name = user_json['name']
create_time = datetime.fromtimestamp(ct).strftime('%Y-%m-%d %H:%M:%S')
return {
'tweet_id': tweet_id,
'create_time': create_time,
'full_text': full_text,
'user_id': user_id,
'user_screen_name': user_screen_name,
'user_name': user_name
}
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
# '--runner=DataflowRunner',
'--runner=DirectRunner',
'--project=<project-id>',
'--staging_location=<bucket_path>/staging',
'--temp_location=<bucket_path>/temp',
'--job_name=<job_name>',
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
query = 'SELECT tweet_id, ct, full_text, user FROM tweet.SiroTalk'
(p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='<project_name>', use_standard_sql=False, query=query))
| 'modify' >> beam.Map(parse_user)
| 'write' >> beam.io.Write(beam.io.BigQuerySink(
'tweet.SiroTalkPython',
schema='tweet_id:INTEGER, create_time:DATETIME, full_text:STRING, user_id:INTEGER, user_screen_name:STRING, user_name:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Lauf
$ python parseuser.py
Eigentlich ist es allgemeiner, mit Parametern auszuführen, aber dieses Mal habe ich es für die Praxis fest codiert.
Die Daten werden von parse_user (Element) verarbeitet: Element enthält eine Zeile der Tabelle. Konfigurieren Sie die Form, die Sie hier ausgeben möchten, und geben Sie sie zurück.
Für Python konnte ich keine Möglichkeit finden, die partitionierte Tabelle in der BigQuery-Ausgabe festzulegen. Wenn Sie dies nicht tun können, ändert sich die Art und Weise, wie Sie Geld für die Ausführung von Abfragen ausgeben, erheblich, was ein Problem darstellt. Außerdem wird nur Python2 unterstützt.
Verarbeitete Tabelle
Java ist aus dem Beispiel genauso einfach zu machen. Insbesondere kann Java nicht mit nur einer Datei wie Python erstellt werden. Es kann einfacher sein, basierend auf dem ersten Datenfluss des Beispiels zu starten.
Ich habe das Beispiel WordCount.java geändert.
ParseUser.java
package com.example;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.services.bigquery.model.*;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.json.JSONObject;
public class ParseUser {
public interface ParseUserPipelineOptions extends GcpOptions {
// @Description("BigQuery dataset name")
// @Default.String("tweet")
// String getDataset();
// void setDataset(String dataset);
//
// @Description("BigQuery input table name")
// @Default.String("SiroTalkTest2")
// String getInputTable();
// void setInputTable(String table);
@Description("BigQuery table schema file")
@Default.String("schema.json")
String getSchemaFile();
void setSchemaFile(String schemaFile);
// @Description("BigQuery output table name")
// @Default.String("SiroTalkJava")
// String getOutputTable();
// void setOutputTable(String outputTable);
}
public static class ParseUserFn extends DoFn<TableRow, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
TableRow tweet = c.element();
Long tweetId = Long.parseLong(tweet.get("tweet_id").toString());
Long ct = Long.parseLong(tweet.get("ct").toString());
String fullText = tweet.get("full_text").toString();
String user = tweet.get("user").toString();
Instant instant = Instant.ofEpochSecond(ct);
String tweetDateString = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.ofInstant(instant, ZoneId.of("Asia/Tokyo")));
JSONObject userJson = new JSONObject(user);
Long userId = Long.parseLong(userJson.get("id").toString());
String screenName = userJson.get("screen_name").toString();
String name = userJson.get("name").toString();
TableRow outputRow = new TableRow()
.set("tweet_id", tweetId)
.set("ct", ct)
.set("create_time", tweetDateString)
.set("full_text", fullText)
.set("user_id", userId)
.set("user_screen_name", screenName)
.set("user_name", name);
c.output(outputRow);
}
}
public static void main(String[] args) throws IOException {
final ParseUserPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ParseUserPipelineOptions.class);
final String tableSchemaJson = new String(Files.readAllBytes(Paths.get(options.getSchemaFile())), Charset.forName("UTF-8"));
final TableSchema tableSchema = new TableSchema().setFields(new ObjectMapper().reader().forType(new TypeReference<List<TableFieldSchema>>() {}).readValue(tableSchemaJson));
final Pipeline p = Pipeline.create(options);
PCollection<TableRow> inputRows = p.apply("ReadFromBQ", BigQueryIO.readTableRows()
.fromQuery("SELECT tweet_id, ct, full_text, user FROM [<project_name>:tweet.SiroTalk]"));
PCollection<TableRow> outputRows = inputRows.apply(ParDo.of(new ParseUserFn()));
outputRows.apply("WriteToBQ", BigQueryIO.writeTableRows()
.to("<project_name>:tweet.SiroTalkJava")
.withSchema(tableSchema)
.withTimePartitioning(new TimePartitioning().setField("ct"))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_EMPTY));
p.run().waitUntilFinish();
}
}
schema.json
[
{
"name": "tweet_id",
"type": "INTEGER"
},
{
"name": "ct",
"type": "TIMESTAMP"
},
{
"name": "create_time",
"type": "DATETIME"
},
{
"name": "full_text",
"type": "STRING"
},
{
"name": "user_id",
"type": "INTEGER"
},
{
"name": "user_screen_name",
"type": "STRING"
},
{
"name": "user_name",
"type": "STRING"
}
]
Lauf
mvn compile exec:java \
-Dexec.mainClass=com.example.ParseUser \
-Dexec.args="--project=<project-id> \
--stagingLocation=<bucket_path>/staging/ \
--runner=DataflowRunner"
Das Schema der Ausgabetabelle ist in schema.json geschrieben. Es wird direkt unter dem auszuführenden Verzeichnis abgelegt. Wenn Sie ParseUserPipelineOptions ordnungsgemäß verwenden, können Sie die Argumente zur Laufzeit verwenden, vorerst jedoch hart codieren.
Ich ziehe eine Abfrage ähnlich wie Python und verarbeite sie mit ParseUserFn. Die Art zu schreiben hier ist einzigartig und ich ahme es fast nach.
Verarbeitete Tabelle
BigQuery-Ausgabepartition, jedoch für Java
.Geben Sie die Option withTimePartitioning an.
Dies liegt daran, dass Sie nur den TIMESTAMP-Typ oder den DATE-Typ angeben können
Dieses Mal habe ich das CT, das ich in Long als TIMESTAMP eingegeben habe, eingefügt und es zu einer Partitionsspalte gemacht.
`` `Dies ist eine geteilte Tabelle, wenn Sie eine einfache BigQuery eingeben. Es gab einen Pop namens `` ``
Es kam nicht mit dieser Art des Puttens heraus.
Die Verarbeitungskapazität zum Zeitpunkt von WHERE ct wurde jedoch ordnungsgemäß reduziert, sodass es in Ordnung sein sollte ...
Ist es der Unterschied zwischen _PARTITIONTIME und Spalte?
Ehemaliger Tisch
<img width="303" alt="スクリーンショット 2018-08-05 17.43.18.png " src="https://qiita-image-store.s3.amazonaws.com/0/265518/ceef5162-f013-a8d4-7048-805f4f3e25b3.png ">
Nachbearbeitungstabelle
<img width="222" alt="スクリーンショット 2018-08-05 17.43.24.png " src="https://qiita-image-store.s3.amazonaws.com/0/265518/a0ca872d-0e44-9d82-182b-684175475089.png ">
Ich habe es mit Apache Beam Version 2.4.0 gemacht, aber als ich es auf die neueste Version 2.5.0 aktualisiert habe, hat es nicht mehr funktioniert.
Ich habe es nicht im Detail untersucht, aber mir wurde gesagt, dass es keine Funktion gibt.
# 4. Machen Sie die verarbeitete Tabelle im Dashboard sichtbar ...
Weiter ... vielleicht kann ich es nicht sofort tun
Dieses Mal habe ich zuerst eine einfache Datenverarbeitung versucht.
Es gibt andere Möglichkeiten, die Pipeline zu verwenden, sodass wahrscheinlich viele Dinge getan werden können.
Es war schwierig, weil es keine Informationen gab ...
Ich habe noch nichts Kompliziertes gemacht, daher kann ich nicht sagen, welches besser ist, Python oder Java.
Python war einfacher zu machen.
Bisher diesmal vorerst.
Recommended Posts