Mit Google Cloud Dataflow können Sie Daten an mehrere Standorte ausgeben, indem Sie die Pipeline verzweigen. Bei dieser Methode muss das Ausgabeziel jedoch im Voraus festgelegt werden, und es ist (wahrscheinlich) nicht möglich, so etwas wie "Ausgabe an eine in den Daten angegebene BigQuery-Tabelle" durchzuführen. Ich suchte nach einem Weg, um eine solche ** dynamische Ausgabeverteilung ** zu erreichen, und als allgemeiner Zweck DynamicDestinations
Ich fand, dass ich die Klasse .0 / org / apache /beam / sdk / io / gcp / bigquery / DynamicDestinations.html verwenden sollte.
In diesem Artikel werden wir die Tabellensortierung auf zwei Arten versuchen: die Methode mit diesen DynamicDestinations
und die vereinfachte Methode.
Da ich diesmal nur an der Ausgabe interessiert bin, werde ich "den von Pub / Sub erhaltenen Text so speichern, wie er in BigQuery ist", ohne zusätzliche Datenverarbeitung. Die Pipeline ist jedoch verzweigt und es werden zwei Arten der Ausgabeverarbeitung hinzugefügt, damit sie mit der Ausgabe einer normalen Einzeltabelle verglichen werden kann.
Das Folgende ist ein spezifisches Beispiel für die in BigQuery gespeicherten Daten.
$ table="<project_id>:<dataset_id>.<table_id>"
$ bq head -n 3 "${table}"
+-----------+
| text |
+-----------+
| Charizard |
| Alakazam |
| Nidoqueen |
+-----------+
$ bq head -n 3 "${table}_C"
+------------+
| text |
+------------+
| Cloyster |
| Clefairy |
| Charmander |
+------------+
$ {table}
enthält alle Eingabedaten, während $ {table} _C
beispielsweise nur den Text enthält, der mit C beginnt. Jede Verteilungstabelle ist ein Code-generierter Name, und zur Laufzeit werden nur die von Ihnen benötigten Namen erstellt.
src/main/java/com/example/dataflow/DynamicDestinationsPipeline.java
package com.example.dataflow;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
/**
* An example of BigQuery dynamic destinations.
*
* <p>To run this example using managed resource in Google Cloud
* Platform, you should specify the following command-line options:
* --project=<YOUR_PROJECT_ID>
* --jobName=<JOB_NAME>
* --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
* --tempLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
* --runner=DataflowRunner
* --input=<PUBSUB_INPUT_TOPIC>
* --output=<BIGQUERY_OUTPUT_TABLE>
*/
public class DynamicDestinationsPipeline {
/**
* Add commandline options: input (Pub/Sub topic ID) and output (BigQuery table ID).
*/
public interface MyOptions extends PipelineOptions {
@Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>")
@Validation.Required
String getInput();
void setInput(String value);
@Description("BigQuery table to write to, specified as <project_id>:<dataset_id>.<table_id>")
@Validation.Required
String getOutput();
void setOutput(String value);
}
/**
* Define BigQuery dynamic destinations for {@code PCollection<String>} input elements.
* Destination table name will be {@code <table_id>_<initial for input string>}.
*/
static class MyDestinations extends DynamicDestinations<String, String> {
private final String tablePrefix;
MyDestinations(String tableId) {
tablePrefix = tableId + "_";
}
/**
* Returns a destination table specifier: initial for input string.
*/
@Override
public String getDestination(ValueInSingleWindow<String> element) {
return element.getValue().substring(0, 1);
}
/**
* Returns a TableDestination object for the destination.
* The table name will be {@code <table_id>_<initial for input string>}.
*/
@Override
public TableDestination getTable(String destination) {
return new TableDestination(tablePrefix + destination, "Table for initial " + destination);
}
/**
* Returns a table schema for the destination.
* The table has only one column: text as STRING.
*/
@Override
public TableSchema getSchema(String destination) {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("text").setType("STRING"));
return new TableSchema().setFields(fields);
}
}
/**
* A formatter to convert an input text to a BigQuery table row.
*/
static class MyFormatFunction implements SerializableFunction<String, TableRow> {
@Override
public TableRow apply(String input) {
return new TableRow().set("text", input);
}
}
/**
* Run a pipeline.
* It reads texts from a Pub/Sub topic and writes them to BigQuery tables.
* The master table saves all the texts, and other tables save the texts with same initials.
*/
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("text").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
Pipeline p = Pipeline.create(options);
PCollection<String> texts = p.apply("ReadFromPubSub", PubsubIO.readStrings()
.fromTopic(options.getInput()));
texts.apply("WriteToBigQuery", BigQueryIO.<String>write()
.to(options.getOutput())
.withSchema(schema)
.withFormatFunction(new MyFormatFunction())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
texts.apply("WriteToBigQueryShards", BigQueryIO.<String>write()
.to(new MyDestinations(options.getOutput()))
.withFormatFunction(new MyFormatFunction())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
p.run();
}
}
withSchema ()
ist nicht erforderlich. In DynamicDestinations
enthalten.getDestination ()
gibt ein Objekt (Ziel) zurück, um die Verteilung von Daten zu identifizieren. Diesmal ist es die "Textinitiale" und der Typ ist "String".getTable ()
gibt die Tabelle zurück, die dem Ziel entspricht. Dieses Mal werden die Initialen zum Namen der Basistabelle [^ invalid-initial] hinzugefügt.getSchema ()
gibt das Schema der Tabelle zurück, die dem Ziel entspricht. Dieses Mal ist es allen Tabellen gemeinsam, daher erstelle ich genau das Gleiche. (* Hier verstopft. Siehe [Nächster Abschnitt](# Verstopft))[^ invalid-initial]: Der Umgang mit Zeichen, die nicht in Tabellennamen verwendet werden können, wurde übersprungen.
Erstellen Sie vor dem Ausführen ein Pub / Sub-Thema und ein BigQuery-Dataset. Wenn Sie stagingLocation usw. angeben, benötigen Sie auch einen Storage Bucket.
Sie können über die GUI in Pub / Sub veröffentlichen, aber ich habe das folgende Ruby-Skript verwendet, um die Textdatei zu streamen.
publish.rb
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new(
project_id: "<project_id>",
credentials: "credential.json"
)
topic = pubsub.topic "<topic_id>"
while (text = gets)
topic.publish text.chomp
end
$ head -3 list.txt
Bulbasaur
Ivysaur
Venusaur
$ ruby publish.rb < list.txt
Zuerst habe ich das Schema auch an die Klasse "MyDestinations" übergeben, aber als ich es ausführte, bekam ich eine "IllegalArgumentException". Nach der Suche fand ich Gleicher Fehler und serialisierte. Ich stellte fest, dass ich nicht das, was ich nicht konnte, in die Klasse aufnehmen konnte. Vorläufig habe ich beschlossen, es jedes Mal in getSchema () zu erstellen, um den Fehler zu vermeiden.
Nicht auf dieses Beispiel beschränkt, kann es Fälle geben, in denen ** Tabellen sortiert sind, das Schema jedoch genau das gleiche ist **. In diesem Fall können Sie "SerializableFunction" in "to ()" angeben, um nur den Tabellennamen dynamisch zu machen (BigQueryIO. (Siehe "Sharding von BigQuery-Ausgabetabellen" in /org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html). Das Schema wird normalerweise mit withSchema ()
angegeben.
Der Inhalt von "SerializableFunction" implementiert nur den Prozess des Kombinierens von "getDestination ()" und "getTable ()" von "DynamicDestinations". Der folgende Code erstellt eine lokale Variable namens "Ziel", daher sollte es leicht sein, die Entsprechung mit dem vorherigen Code zu verstehen.
/**
* Define BigQuery dynamic destinations for {@code PCollection<String>} input elements.
* Destination table name will be {@code <table_id>_<initial for input string>}.
*/
static class MyDestinationsFunction
implements SerializableFunction<ValueInSingleWindow<String>, TableDestination> {
private final String tablePrefix;
MyDestinationsFunction(String tableId) {
tablePrefix = tableId + "_";
}
@Override
public TableDestination apply(ValueInSingleWindow<String> input) {
String destination = input.getValue().substring(0, 1);
return new TableDestination(tablePrefix + destination, "Table for initial " + destination);
}
}
...
texts.apply("WriteToBigQueryShards", BigQueryIO.<String>write()
.to(new MyDestinationsFunction(options.getOutput()))
.withSchema(schema)
.withFormatFunction(new MyFormatFunction())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Recommended Posts