[GO] Sortieren Sie BigQuery-Tabellen nach Daten im Datenfluss

Mit Google Cloud Dataflow können Sie Daten an mehrere Standorte ausgeben, indem Sie die Pipeline verzweigen. Bei dieser Methode muss das Ausgabeziel jedoch im Voraus festgelegt werden, und es ist (wahrscheinlich) nicht möglich, so etwas wie "Ausgabe an eine in den Daten angegebene BigQuery-Tabelle" durchzuführen. Ich suchte nach einem Weg, um eine solche ** dynamische Ausgabeverteilung ** zu erreichen, und als allgemeiner Zweck DynamicDestinations Ich fand, dass ich die Klasse .0 / org / apache /beam / sdk / io / gcp / bigquery / DynamicDestinations.html verwenden sollte.

In diesem Artikel werden wir die Tabellensortierung auf zwei Arten versuchen: die Methode mit diesen DynamicDestinations und die vereinfachte Methode.

Beispielkonfiguration

Da ich diesmal nur an der Ausgabe interessiert bin, werde ich "den von Pub / Sub erhaltenen Text so speichern, wie er in BigQuery ist", ohne zusätzliche Datenverarbeitung. Die Pipeline ist jedoch verzweigt und es werden zwei Arten der Ausgabeverarbeitung hinzugefügt, damit sie mit der Ausgabe einer normalen Einzeltabelle verglichen werden kann.

dd-pipeline.png

Das Folgende ist ein spezifisches Beispiel für die in BigQuery gespeicherten Daten.

$ table="<project_id>:<dataset_id>.<table_id>"
$ bq head -n 3 "${table}"
+-----------+
|   text    |
+-----------+
| Charizard |
| Alakazam  |
| Nidoqueen |
+-----------+
$ bq head -n 3 "${table}_C"
+------------+
|    text    |
+------------+
| Cloyster   |
| Clefairy   |
| Charmander |
+------------+

$ {table} enthält alle Eingabedaten, während $ {table} _C beispielsweise nur den Text enthält, der mit C beginnt. Jede Verteilungstabelle ist ein Code-generierter Name, und zur Laufzeit werden nur die von Ihnen benötigten Namen erstellt.

Code

src/main/java/com/example/dataflow/DynamicDestinationsPipeline.java


package com.example.dataflow;

import java.util.ArrayList;
import java.util.List;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;

/**
 * An example of BigQuery dynamic destinations.
 *
 * <p>To run this example using managed resource in Google Cloud
 * Platform, you should specify the following command-line options:
 *   --project=<YOUR_PROJECT_ID>
 *   --jobName=<JOB_NAME>
 *   --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
 *   --tempLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
 *   --runner=DataflowRunner
 *   --input=<PUBSUB_INPUT_TOPIC>
 *   --output=<BIGQUERY_OUTPUT_TABLE>
 */
public class DynamicDestinationsPipeline {
  /**
   * Add commandline options: input (Pub/Sub topic ID) and output (BigQuery table ID).
   */
  public interface MyOptions extends PipelineOptions {
    @Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>")
    @Validation.Required
    String getInput();
    void setInput(String value);

    @Description("BigQuery table to write to, specified as <project_id>:<dataset_id>.<table_id>")
    @Validation.Required
    String getOutput();
    void setOutput(String value);
  }

  /**
   * Define BigQuery dynamic destinations for {@code PCollection<String>} input elements.
   * Destination table name will be {@code <table_id>_<initial for input string>}.
   */
  static class MyDestinations extends DynamicDestinations<String, String> {
    private final String tablePrefix;

    MyDestinations(String tableId) {
      tablePrefix = tableId + "_";
    }

    /**
     * Returns a destination table specifier: initial for input string.
     */
    @Override
    public String getDestination(ValueInSingleWindow<String> element) {
      return element.getValue().substring(0, 1);
    }

    /**
     * Returns a TableDestination object for the destination.
     * The table name will be {@code <table_id>_<initial for input string>}.
     */
    @Override
    public TableDestination getTable(String destination) {
      return new TableDestination(tablePrefix + destination, "Table for initial " + destination);
    }

    /**
     * Returns a table schema for the destination.
     * The table has only one column: text as STRING.
     */
    @Override
    public TableSchema getSchema(String destination) {
      List<TableFieldSchema> fields = new ArrayList<>();
      fields.add(new TableFieldSchema().setName("text").setType("STRING"));
      return new TableSchema().setFields(fields);
    }
  }

  /**
   * A formatter to convert an input text to a BigQuery table row.
   */
  static class MyFormatFunction implements SerializableFunction<String, TableRow> {
    @Override
    public TableRow apply(String input) {
      return new TableRow().set("text", input);
    }
  }

  /**
   * Run a pipeline.
   * It reads texts from a Pub/Sub topic and writes them to BigQuery tables.
   * The master table saves all the texts, and other tables save the texts with same initials.
   */
  public static void main(String[] args) {
    MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("text").setType("STRING"));
    TableSchema schema = new TableSchema().setFields(fields);

    Pipeline p = Pipeline.create(options);

    PCollection<String> texts = p.apply("ReadFromPubSub", PubsubIO.readStrings()
        .fromTopic(options.getInput()));
    texts.apply("WriteToBigQuery", BigQueryIO.<String>write()
        .to(options.getOutput())
        .withSchema(schema)
        .withFormatFunction(new MyFormatFunction())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
    texts.apply("WriteToBigQueryShards", BigQueryIO.<String>write()
        .to(new MyDestinations(options.getOutput()))
        .withFormatFunction(new MyFormatFunction())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

    p.run();
  }
}

[^ invalid-initial]: Der Umgang mit Zeichen, die nicht in Tabellennamen verwendet werden können, wurde übersprungen.


Erstellen Sie vor dem Ausführen ein Pub / Sub-Thema und ein BigQuery-Dataset. Wenn Sie stagingLocation usw. angeben, benötigen Sie auch einen Storage Bucket.

Sie können über die GUI in Pub / Sub veröffentlichen, aber ich habe das folgende Ruby-Skript verwendet, um die Textdatei zu streamen.

publish.rb


require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new(
  project_id: "<project_id>",
  credentials: "credential.json"
)
topic = pubsub.topic "<topic_id>"

while (text = gets)
  topic.publish text.chomp
end
$ head -3 list.txt
Bulbasaur
Ivysaur
Venusaur
$ ruby publish.rb < list.txt

Wo es stecken bleibt

Zuerst habe ich das Schema auch an die Klasse "MyDestinations" übergeben, aber als ich es ausführte, bekam ich eine "IllegalArgumentException". Nach der Suche fand ich Gleicher Fehler und serialisierte. Ich stellte fest, dass ich nicht das, was ich nicht konnte, in die Klasse aufnehmen konnte. Vorläufig habe ich beschlossen, es jedes Mal in getSchema () zu erstellen, um den Fehler zu vermeiden.

Einfache Sortierung

Nicht auf dieses Beispiel beschränkt, kann es Fälle geben, in denen ** Tabellen sortiert sind, das Schema jedoch genau das gleiche ist **. In diesem Fall können Sie "SerializableFunction" in "to ()" angeben, um nur den Tabellennamen dynamisch zu machen (BigQueryIO. (Siehe "Sharding von BigQuery-Ausgabetabellen" in /org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html). Das Schema wird normalerweise mit withSchema () angegeben.

Der Inhalt von "SerializableFunction" implementiert nur den Prozess des Kombinierens von "getDestination ()" und "getTable ()" von "DynamicDestinations". Der folgende Code erstellt eine lokale Variable namens "Ziel", daher sollte es leicht sein, die Entsprechung mit dem vorherigen Code zu verstehen.

  /**
   * Define BigQuery dynamic destinations for {@code PCollection<String>} input elements.
   * Destination table name will be {@code <table_id>_<initial for input string>}.
   */
  static class MyDestinationsFunction
      implements SerializableFunction<ValueInSingleWindow<String>, TableDestination> {
    private final String tablePrefix;

    MyDestinationsFunction(String tableId) {
      tablePrefix = tableId + "_";
    }

    @Override
    public TableDestination apply(ValueInSingleWindow<String> input) {
      String destination = input.getValue().substring(0, 1);
      return new TableDestination(tablePrefix + destination, "Table for initial " + destination);
    }
  }

...

    texts.apply("WriteToBigQueryShards", BigQueryIO.<String>write()
        .to(new MyDestinationsFunction(options.getOutput()))
        .withSchema(schema)
        .withFormatFunction(new MyFormatFunction())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

Was ich von nun an machen möchte

Referenz

Recommended Posts

Sortieren Sie BigQuery-Tabellen nach Daten im Datenfluss
Versuchen Sie, Daten in MongoDB abzulegen
Ermöglichen Sie Jupyter Notebook, Audiodaten zur Wiedergabe in HTML-Tabellen einzubetten
So arbeiten Sie mit BigQuery in Python
Speichern Sie BigQuery-Tabellen mithilfe von Python in GCS
Verwenden Sie Cloud Dataflow, um das Ziel dynamisch entsprechend dem Wert der Daten zu ändern und in GCS zu speichern
SELECT-Daten mithilfe der Client-Bibliothek mit BigQuery
Bücher über Datenwissenschaft, die 2020 gelesen werden sollen
So erstellen Sie Daten für CNN (Chainer)
Lesen von Zeitreihendaten in PyTorch
Ich habe versucht, eine selektive Sortierung in Python zu implementieren
[Python] So sortieren Sie Diktate in Listen und Instanzen in Listen
Überprüfen Sie, ob in Java BigQuery-Tabellen vorhanden sind
Versuchen Sie, die in Firefox gespeicherten Anmeldedaten zu entschlüsseln
Sortieren Sie die Postdaten in umgekehrter Reihenfolge mit Djangos ListView