With Google Cloud Dataflow, you can output data to multiple locations by branching the pipeline. However, with this method, the output destination must be decided in advance, and it is not possible to do something like "output to any BigQuery table specified in the data" (probably). I was looking for a way to achieve such ** dynamic output distribution **, and as a general purpose DynamicDestinations
I found that I should use the class .0 / org / apache / beam / sdk / io / gcp / bigquery / DynamicDestinations.html).
In this article, I will try table sorting in two ways, the method using this DynamicDestinations
and the simplified method.
Since I am only interested in output this time, I will "save the text obtained from Pub / Sub as it is in BigQuery" without any extra data processing. However, the pipeline is branched and two types of output processing are added so that it can be compared with the output to a normal single table.
Specific examples of data stored in BigQuery are as follows.
$ table="<project_id>:<dataset_id>.<table_id>"
$ bq head -n 3 "${table}"
+-----------+
| text |
+-----------+
| Charizard |
| Alakazam |
| Nidoqueen |
+-----------+
$ bq head -n 3 "${table}_C"
+------------+
| text |
+------------+
| Cloyster |
| Clefairy |
| Charmander |
+------------+
$ {table}
contains all the input data, while $ {table} _C
contains only the text that starts with C, for example. Each table for distribution is a name generated in the code, and only the ones that are needed are created at runtime.
src/main/java/com/example/dataflow/DynamicDestinationsPipeline.java
package com.example.dataflow;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
/**
* An example of BigQuery dynamic destinations.
*
* <p>To run this example using managed resource in Google Cloud
* Platform, you should specify the following command-line options:
* --project=<YOUR_PROJECT_ID>
* --jobName=<JOB_NAME>
* --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
* --tempLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
* --runner=DataflowRunner
* --input=<PUBSUB_INPUT_TOPIC>
* --output=<BIGQUERY_OUTPUT_TABLE>
*/
public class DynamicDestinationsPipeline {
/**
* Add commandline options: input (Pub/Sub topic ID) and output (BigQuery table ID).
*/
public interface MyOptions extends PipelineOptions {
@Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>")
@Validation.Required
String getInput();
void setInput(String value);
@Description("BigQuery table to write to, specified as <project_id>:<dataset_id>.<table_id>")
@Validation.Required
String getOutput();
void setOutput(String value);
}
/**
* Define BigQuery dynamic destinations for {@code PCollection<String>} input elements.
* Destination table name will be {@code <table_id>_<initial for input string>}.
*/
static class MyDestinations extends DynamicDestinations<String, String> {
private final String tablePrefix;
MyDestinations(String tableId) {
tablePrefix = tableId + "_";
}
/**
* Returns a destination table specifier: initial for input string.
*/
@Override
public String getDestination(ValueInSingleWindow<String> element) {
return element.getValue().substring(0, 1);
}
/**
* Returns a TableDestination object for the destination.
* The table name will be {@code <table_id>_<initial for input string>}.
*/
@Override
public TableDestination getTable(String destination) {
return new TableDestination(tablePrefix + destination, "Table for initial " + destination);
}
/**
* Returns a table schema for the destination.
* The table has only one column: text as STRING.
*/
@Override
public TableSchema getSchema(String destination) {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("text").setType("STRING"));
return new TableSchema().setFields(fields);
}
}
/**
* A formatter to convert an input text to a BigQuery table row.
*/
static class MyFormatFunction implements SerializableFunction<String, TableRow> {
@Override
public TableRow apply(String input) {
return new TableRow().set("text", input);
}
}
/**
* Run a pipeline.
* It reads texts from a Pub/Sub topic and writes them to BigQuery tables.
* The master table saves all the texts, and other tables save the texts with same initials.
*/
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("text").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
Pipeline p = Pipeline.create(options);
PCollection<String> texts = p.apply("ReadFromPubSub", PubsubIO.readStrings()
.fromTopic(options.getInput()));
texts.apply("WriteToBigQuery", BigQueryIO.<String>write()
.to(options.getOutput())
.withSchema(schema)
.withFormatFunction(new MyFormatFunction())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
texts.apply("WriteToBigQueryShards", BigQueryIO.<String>write()
.to(new MyDestinations(options.getOutput()))
.withFormatFunction(new MyFormatFunction())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
p.run();
}
}
BigQueryIO.Write <T>
when compared to single table output.to ()
to DynamicDestinations <T,?>
. This time, in order to give the class the base table name, the inherited MyDestinations
is used as an argument.withSchema ()
is not required. Included in DynamicDestinations
.DynamicDestinations
.getDestination ()
returns an object (destination) to identify the distribution of data. This time it is the "text initials" and the type is String
.getTable ()
returns the table corresponding to the destination. This time, initials are added to the base table name [^ invalid-initial].getSchema ()
returns the schema of the table corresponding to the destination. This time it is common to all tables, so I am creating exactly the same thing. (* Clogged here. See [Next Section](# Clogged))[^ invalid-initial]: Dealing with characters that cannot be used in table names is skipped.
Create a Pub / Sub topic and a BigQuery dataset before running. If you specify stagingLocation etc., you also need a Storage bucket.
You can publish to Pub / Sub from GUI, but I used the following Ruby script to stream the text file.
publish.rb
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new(
project_id: "<project_id>",
credentials: "credential.json"
)
topic = pubsub.topic "<topic_id>"
while (text = gets)
topic.publish text.chomp
end
$ head -3 list.txt
Bulbasaur
Ivysaur
Venusaur
$ ruby publish.rb < list.txt
At first I also passed the schema to the MyDestinations
class, but when I ran it I got a ʻIllegalArgumentException. After searching, I found [Same error](https://stackoverflow.com/questions/46165895/dataflow-dynamicdestinations-unable-to-serialize-org-apache-beam-sdk-io-gcp-bigq) and serialize. I found that I couldn't put what I couldn't into the class. For the time being, I decided to create it every time in
getSchema ()` to avoid the error.
Not limited to this example, there may be cases where ** tables are sorted but the schema is exactly the same **. In this case, you can specify SerializableFunction
into ()
to make only the table name dynamic (BigQueryIO See "Sharding BigQuery output tables" in /org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html). The schema is usually specified with withSchema ()
.
The contents of SerializableFunction
only implements the process of combininggetDestination ()
andgetTable ()
of DynamicDestinations
. The code below creates a local variable called destination
, so I think it's easy to understand the correspondence with the previous code.
/**
* Define BigQuery dynamic destinations for {@code PCollection<String>} input elements.
* Destination table name will be {@code <table_id>_<initial for input string>}.
*/
static class MyDestinationsFunction
implements SerializableFunction<ValueInSingleWindow<String>, TableDestination> {
private final String tablePrefix;
MyDestinationsFunction(String tableId) {
tablePrefix = tableId + "_";
}
@Override
public TableDestination apply(ValueInSingleWindow<String> input) {
String destination = input.getValue().substring(0, 1);
return new TableDestination(tablePrefix + destination, "Table for initial " + destination);
}
}
...
texts.apply("WriteToBigQueryShards", BigQueryIO.<String>write()
.to(new MyDestinationsFunction(options.getOutput()))
.withSchema(schema)
.withFormatFunction(new MyFormatFunction())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
" <project ID>: <dataset ID>. <Table ID> "
, so it is possible normally, and I think that it is only necessary to set the authority successfully. I will.Recommended Posts