La dernière fois que j'ai écrit que j'utiliserais Scio au lieu de Java et Python. Tout d'abord, j'ai essayé d'utiliser Java et Python pour m'entraîner. Je ne suis pas allé à Scio cette fois.
Ce que je veux faire ** Importez les données traitées dans BigQuery à partir de BigQuery contenant des données brutes ** est.
L'environnement est Python 2.7.13 + Apache Beam 2.5.0 Java 1.8.0_25 + Apache Beam 2.4.0
Lorsque j'ai été recherché sur Google, j'ai eu beaucoup de problèmes car de nombreux articles de la série Cloud Dataflow 1 ont été capturés. Même le document officiel est assez coincé dans le 1er système. Je pense qu'il était préférable de lire Apache Beam de toutes mes forces.
De plus, la documentation officielle de GCP a différentes versions en japonais et en anglais, et le japonais est souvent ancien. Je pense que vous pouvez être satisfait des extensions Chrome suivantes. GCP outdated docs checker
Document officiel Tout d'abord, vous pouvez comprendre le flux en effectuant l'exemple de comptage de mots du document officiel.
À partir des données de tweet d'origine qui ont été placées dans BigQuery la dernière fois Décompose les informations utilisateur et génère l'ID utilisateur, le nom d'utilisateur et le nom d'écran. Voici le programme d'exécution.
La table tweet.SiroTalk est l'entrée et la table tweet.SiroTalkPython3 est la sortie.
parseuser.py
from __future__ import absolute_import
import argparse
import logging
import re
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import json
from datetime import datetime
def parse_user(element):
tweet_id = element['tweet_id']
ct = element['ct']
full_text = element['full_text']
user = element['user']
user_json = json.loads(user)
user_id = user_json['id']
user_screen_name = user_json['screen_name']
user_name = user_json['name']
create_time = datetime.fromtimestamp(ct).strftime('%Y-%m-%d %H:%M:%S')
return {
'tweet_id': tweet_id,
'create_time': create_time,
'full_text': full_text,
'user_id': user_id,
'user_screen_name': user_screen_name,
'user_name': user_name
}
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
# '--runner=DataflowRunner',
'--runner=DirectRunner',
'--project=<project-id>',
'--staging_location=<bucket_path>/staging',
'--temp_location=<bucket_path>/temp',
'--job_name=<job_name>',
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
query = 'SELECT tweet_id, ct, full_text, user FROM tweet.SiroTalk'
(p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='<project_name>', use_standard_sql=False, query=query))
| 'modify' >> beam.Map(parse_user)
| 'write' >> beam.io.Write(beam.io.BigQuerySink(
'tweet.SiroTalkPython',
schema='tweet_id:INTEGER, create_time:DATETIME, full_text:STRING, user_id:INTEGER, user_screen_name:STRING, user_name:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Courir
$ python parseuser.py
En fait, il est plus général d'exécuter avec des paramètres, mais cette fois je l'ai codé en dur pour la pratique.
Les données sont traitées par parse_user (élément): L'élément contient une ligne du tableau. Configurez la forme que vous souhaitez générer ici et renvoyez-la.
Pour Python, je n'ai pas trouvé de moyen de définir la table partitionnée dans la sortie BigQuery. Si vous ne pouvez pas faire cela, la façon dont vous dépensez de l'argent lors de l'exécution des requêtes changera considérablement, ce qui pose un problème. De plus, seul python2 est pris en charge.
Table traitée
Java est tout aussi facile à faire à partir de l'exemple. En particulier, Java ne peut pas être créé avec un seul fichier comme python. Il peut être plus facile de démarrer en fonction de l'exemple de premier flux de données.
J'ai modifié l'exemple WordCount.java.
ParseUser.java
package com.example;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.services.bigquery.model.*;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.json.JSONObject;
public class ParseUser {
public interface ParseUserPipelineOptions extends GcpOptions {
// @Description("BigQuery dataset name")
// @Default.String("tweet")
// String getDataset();
// void setDataset(String dataset);
//
// @Description("BigQuery input table name")
// @Default.String("SiroTalkTest2")
// String getInputTable();
// void setInputTable(String table);
@Description("BigQuery table schema file")
@Default.String("schema.json")
String getSchemaFile();
void setSchemaFile(String schemaFile);
// @Description("BigQuery output table name")
// @Default.String("SiroTalkJava")
// String getOutputTable();
// void setOutputTable(String outputTable);
}
public static class ParseUserFn extends DoFn<TableRow, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
TableRow tweet = c.element();
Long tweetId = Long.parseLong(tweet.get("tweet_id").toString());
Long ct = Long.parseLong(tweet.get("ct").toString());
String fullText = tweet.get("full_text").toString();
String user = tweet.get("user").toString();
Instant instant = Instant.ofEpochSecond(ct);
String tweetDateString = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.ofInstant(instant, ZoneId.of("Asia/Tokyo")));
JSONObject userJson = new JSONObject(user);
Long userId = Long.parseLong(userJson.get("id").toString());
String screenName = userJson.get("screen_name").toString();
String name = userJson.get("name").toString();
TableRow outputRow = new TableRow()
.set("tweet_id", tweetId)
.set("ct", ct)
.set("create_time", tweetDateString)
.set("full_text", fullText)
.set("user_id", userId)
.set("user_screen_name", screenName)
.set("user_name", name);
c.output(outputRow);
}
}
public static void main(String[] args) throws IOException {
final ParseUserPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ParseUserPipelineOptions.class);
final String tableSchemaJson = new String(Files.readAllBytes(Paths.get(options.getSchemaFile())), Charset.forName("UTF-8"));
final TableSchema tableSchema = new TableSchema().setFields(new ObjectMapper().reader().forType(new TypeReference<List<TableFieldSchema>>() {}).readValue(tableSchemaJson));
final Pipeline p = Pipeline.create(options);
PCollection<TableRow> inputRows = p.apply("ReadFromBQ", BigQueryIO.readTableRows()
.fromQuery("SELECT tweet_id, ct, full_text, user FROM [<project_name>:tweet.SiroTalk]"));
PCollection<TableRow> outputRows = inputRows.apply(ParDo.of(new ParseUserFn()));
outputRows.apply("WriteToBQ", BigQueryIO.writeTableRows()
.to("<project_name>:tweet.SiroTalkJava")
.withSchema(tableSchema)
.withTimePartitioning(new TimePartitioning().setField("ct"))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_EMPTY));
p.run().waitUntilFinish();
}
}
schema.json
[
{
"name": "tweet_id",
"type": "INTEGER"
},
{
"name": "ct",
"type": "TIMESTAMP"
},
{
"name": "create_time",
"type": "DATETIME"
},
{
"name": "full_text",
"type": "STRING"
},
{
"name": "user_id",
"type": "INTEGER"
},
{
"name": "user_screen_name",
"type": "STRING"
},
{
"name": "user_name",
"type": "STRING"
}
]
Courir
mvn compile exec:java \
-Dexec.mainClass=com.example.ParseUser \
-Dexec.args="--project=<project-id> \
--stagingLocation=<bucket_path>/staging/ \
--runner=DataflowRunner"
Le schéma de la table de sortie est écrit dans schema.json. Il est placé directement sous le répertoire à exécuter. Encore une fois, si vous utilisez correctement ParseUserPipelineOptions, vous pouvez utiliser les arguments au moment de l'exécution, mais pour le moment, le codage en dur.
Je tire une requête similaire à python et la traite avec ParseUserFn. La manière d'écrire ici est unique et je l'imite presque.
Table traitée
Partition de sortie BigQuery, mais pour Java
.Spécifiez à l'aide de l'option withTimePartitioning.
En effet, vous ne pouvez spécifier que le type TIMESTAMP ou le type DATE
Cette fois, j'ai mis le ct que j'ai mis dans Long comme TIMESTAMP et en ai fait une colonne de partition.
`` Il s'agit d'une table fractionnée lorsque vous insérez un BigQuery simple. Il y avait un pop appelé `` ''
Il n'est pas venu avec cette façon de dire.
Cependant, la capacité de traitement au moment de WHERE ct a été réduite correctement, donc ça devrait aller ...
Est-ce la différence entre _PARTITIONTIME et column?
Ancien tableau
<img width="303" alt="スクリーンショット 2018-08-05 17.43.18.png " src="https://qiita-image-store.s3.amazonaws.com/0/265518/ceef5162-f013-a8d4-7048-805f4f3e25b3.png ">
Table de post-traitement
<img width="222" alt="スクリーンショット 2018-08-05 17.43.24.png " src="https://qiita-image-store.s3.amazonaws.com/0/265518/a0ca872d-0e44-9d82-182b-684175475089.png ">
Je l'ai fait avec Apache Beam version 2.4.0, mais lorsque je l'ai mis à niveau vers la dernière version 2.5.0, il a cessé de fonctionner.
Je ne l'ai pas étudié en détail, mais on m'a dit qu'il n'y avait pas de fonction.
# 4. Rendre la table traitée visible sur le tableau de bord ...
Ensuite ... peut-être que je ne peux pas le faire tout de suite
Cette fois, j'ai d'abord essayé le simple traitement des données.
Il existe d'autres façons d'utiliser le pipeline, de sorte qu'il y aura probablement beaucoup de choses à faire.
C'était difficile car il n'y avait aucune information ...
Je n'ai encore rien fait de compliqué, donc je ne peux pas dire lequel est le meilleur, python ou Java.
python était plus facile à faire.
Jusqu'ici cette fois pour le moment.
Recommended Posts