Einführung in Apache Beam mit Google Cloud Dataflow (über 2.0.x-Serie) ~ Combine Edition ~

Einführung in Apache Beam mit Google Cloud Dataflow (über 2.0.x-Serie) ~ Kombinieren ~

Dieser Abschnitt beschreibt die grundlegende Verwendung von Combine, einer der fünf Kerntransformationen von Apache Beam. Die grundlegende Geschichte anderer Kerntransformationen und Apache Beam 2.0.x wird im Folgenden beschrieben.

Apache Beam 2.0.x mit Google Cloud Dataflow - Qiita beginnend mit IntelliJ und Gradle

Einführung in Apache Beam mit Cloud-Datenfluss (über 2.0.0-Serie) ~ Grundlegender Teil ~ ParDo ~ --Qiita

Einführung in Apache Beam mit Google Cloud Dataflow (über 2.0.x-Serie) ~ Grundlegende Gruppierung nach Schlüssel ~ --Qiita

Dieser Artikel wurde unter Bezugnahme auf die folgenden zwei offiziellen Dokumente verfasst.

Beam Programming Guide

Sammlung und Wert kombinieren|Dokumentation zum Cloud-Datenfluss| Google Cloud Platform

Zwei Rollen von Combine

Kombinieren kombiniert oder führt jedes in der PCollection vorhandene Element (alle Daten) zusammen. Ich erkenne, dass es wie Reduzieren in Karte / Mischen / Reduzieren ist.

Es gibt zwei Hauptmethoden zum Kombinieren. "So kombinieren Sie in einer PCollection vorhandene Elemente, um einen Wert zu generieren" und "So kombinieren Sie jedes Element des Werteteils der PCollection, gruppiert nach Schlüssel, um einen Wert zu generieren" Ist. Im Folgenden möchte ich jede Methode beschreiben.

So kombinieren Sie Elemente, die in einer PC-Sammlung vorhanden sind, um einen Wert zu generieren

So kombinieren Sie in einer PCollection vorhandene Elemente, um einen Wert zu generieren

Kombinieren Sie jedes Element in der PCollection. => Es ist zu beachten, dass sich dies von ParDo unterscheidet. ParDo verarbeitet jedes Element in der PCollection. Kombinieren kombiniert jedes Element in der PCollection.

Dies ist beispielsweise der Fall, wenn in der PCollection vorhandene Elemente kombiniert werden, um einen Wert zu generieren.


PCollection<Integer> sum = pCollection.apply(Sum.integersGlobally());

Auf den ersten Blick scheint Combine nicht zu existieren, aber Sum.integersGlobally () umschließt Combine.globally. Die tatsächliche Summe.integersGlobally () ist unten.


public static Combine.Globally<Integer, Integer> integersGlobally() {
  return Combine.globally(Sum.ofIntegers());}

Referenz API-Referenz

withoutDefaults() Wenn Sie leer zurückgeben möchten, wenn eine leere PCollection als inout angegeben wird, fügen Sie withoutDefaults () hinzu.


PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());

Unterschiede im Verhalten zwischen globalem Fenster und nicht globalem Fenster

Im Fall von Global Window wird standardmäßig eine PCollection zurückgegeben, die ein Element enthält.

Andererseits wird im Fall eines nicht globalen Fensters die obige Standardoperation nicht ausgeführt. Geben Sie die Option an, wenn Sie Kombinieren verwenden. Die Formel war leicht zu verstehen, deshalb zitiere ich sie unten. (Zum Zeitpunkt des Schreibens dieses Beitrags hatte Apache Beam 2.0.x diese Beschreibung noch nicht im Dokument. Google Cloud Dataflow 1.9 wird aus dem offiziellen Dokument zitiert.)

Geben Sie> .withoutDefaults an. In diesem Fall ist das leere Fenster in der Eingabe-PC-Sammlung auch unter Ausgabe> Sammlung leer.

Geben Sie> .asSingletonView an. In diesem Fall wird die Ausgabe sofort in eine PCollectionView konvertiert. Dies ist der Standardwert, wenn jedes leere Fenster als sekundäre Eingabe verwendet wird. Im Allgemeinen sollte diese Option nur verwendet werden, wenn das Ergebnis des Kombinats der Pipeline später als sekundäre Eingabe in der Pipeline verwendet wird.

Quelle zitieren: Sammlung und Wert kombinieren|Dokumentation zum Cloud-Datenfluss| Google Cloud Platform

Ich habe den Code tatsächlich geschrieben

Jeder Prozess wird im Code als Kommentar beschrieben. Ich benutze die Methodenkette nicht so oft wie möglich, um das Verständnis zu priorisieren. Daher ist der Code redundant.

package com.company;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;

/**
 *Hauptklasse
 */
public class Main {
    /**
     *Funktionsobjekt
     * String =>Führen Sie eine Ganzzahltypkonvertierung durch
     */
    static class TransformTypeFromStringToInteger extends DoFn<String, Integer> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //String das Element=>In Integer konvertieren und ausgeben
            c.output(Integer.parseInt(c.element()));
        }
    }

    /**
     *Funktionsobjekt
     * Integer =>Führen Sie eine Typkonvertierung von String durch
     */
    static class TransformTypeFromIntegerToString extends DoFn<Integer, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //String das Element=>In Integer konvertieren und ausgeben
            System.out.println(c.element());
            c.output(String.valueOf(c.element()));
        }
    }


    /**
     *Eingabedatenpfad
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Aus Datenpfad
     */
    private static final String OUTPUT_FILE_PATH = "./result.txt";

    /**
     *Verwenden Sie zum Verständnis nicht so oft Methodenketten wie möglich
     *Daher gibt es redundante Teile
     *Hauptmethode
     *
     * @param args
     */
    public static void main(String[] args) {
        //Generieren Sie eine Pipeline, indem Sie die Option angeben
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());

        System.out.println("a");
        //Aus Datei lesen
        PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
        //Zeichne alle gelesenen Daten auf=>In Integer konvertieren
        PCollection<Integer> integerPCollection = lines.apply(ParDo.of(new TransformTypeFromStringToInteger()));
        // Combine.Fassen Sie jedes Element von PCollection in Global zusammen
        //Für eine leere PC-Sammlung, wenn Sie leer zurückkehren möchten=> PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
        PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
        // PCollection<Integer>Summe Ganzzahl=>In String konvertieren
        PCollection<String> sumString = sum.apply(ParDo.of(new TransformTypeFromIntegerToString()));
        //In Datei schreiben
        sumString.apply(TextIO.write().to(OUTPUT_FILE_PATH));

        //Lauf
        pipeline.run().waitUntilFinish();
    }
}

Ich habe den Code für die Implementierung geschrieben (ver mit Methodenkette)

Es war ziemlich sauber

package com.company;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;


/**
 *Hauptklasse
 */
public class Main {
    /**
     *Funktionsobjekt
     * String =>Führen Sie eine Ganzzahltypkonvertierung durch
     */
    static class TransformTypeFromStringToInteger extends DoFn<String, Integer> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //String das Element=>In Integer konvertieren und ausgeben
            c.output(Integer.parseInt(c.element()));
        }
    }

    /**
     *Funktionsobjekt
     * Integer =>Führen Sie eine Typkonvertierung von String durch
     */
    static class TransformTypeFromIntegerToString extends DoFn<Integer, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //String das Element=>In Integer konvertieren und ausgeben
            System.out.println(c.element());
            c.output(String.valueOf(c.element()));
        }
    }


    /**
     *Eingabedatenpfad
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Aus Datenpfad
     */
    private static final String OUTPUT_FILE_PATH = "./result.txt";

    /**
     *Hauptmethode
     *
     * @param args
     */
    public static void main(String[] args) {
        //Pipeline-Generierung
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());

        //Teil bearbeiten
        pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
                .apply(ParDo.of(new TransformTypeFromStringToInteger()))
                .apply(Sum.integersGlobally().withoutDefaults())
                .apply(ParDo.of(new TransformTypeFromIntegerToString()))
                .apply(TextIO.write().to(OUTPUT_FILE_PATH));

        //Lauf
        pipeline.run().waitUntilFinish();
    }
}

Geladene Datei

samole.txt


1
2
3
4
5
6
7
8
9
10

Ausführungsergebnis

result.txt-00000-of-00001 wird ausgegeben Der Inhalt von result.txt-00000-of-00001

55

Was machst du

10
Σk
k = 1

Es ist wie es ist.

PerKey Wenn GroupByKey ausgeführt wird, wird es zu K, V (Iterable Collection). Zum Beispiel:

Java [1, 2, 3]

Der PerKey of Combine kombiniert den V [Iterable Collection] -Teil dieser K, V [Iterable Collection] für jeden Schlüssel. Wenn Sie beispielsweise K und V (Iterable Collection) nach GroupByKey oben kombinieren, ist dies wie folgt.

Java [6]

Alle Elemente von V (Iterable Collection) von K und V (Iterable Collection) werden kombiniert.

Ich habe den Code tatsächlich geschrieben

Jeder Prozess wird im Code als Kommentar beschrieben. Ich benutze die Methodenkette nicht so oft wie möglich, um das Verständnis zu priorisieren. Daher ist der Code redundant.

package com.company;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

/**
 *Maine
 */
public class Main {
    /**
     *Funktionsobjekt
     *Gegebener String str,String num","Teilen durch
     *Ändern Sie num in Integer type und KV<String, Integer>Mach eine Form
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        //ProcessContext ist ein Objekt, das die Eingabe darstellt
        //Das Beam SDK übernimmt es für Sie, ohne es selbst definieren zu müssen
        public void processElement(ProcessContext c) {
            // ","Teilen mit
            String[] words = c.element().split(",");
            //Geteiltes Wort[0]Zu K, Worte[1]Zu Integer zu V.
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }


    /**
     *Funktionsobjekt
     * KV<String, Iterable<Integer>Ändern Sie den Typ in String-Typ
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //Konvertieren Sie die Eingabe in einen String-Typ
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     *Eingabedatenpfad
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Aus Datenpfad
     */
    private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";

    /**
     *Maine
     *Zum besseren Verständnis verwende ich die Methodenkette nicht so oft wie möglich
     *Daher ist der Code redundant.
     *
     * @param args Argument
     */
    public static void main(String[] args) {
        //Erstellen Sie zunächst eine Option, die in der Pipeline festgelegt werden soll
        //Dieses Mal starten wir es lokal, geben Sie also DirectRunner an.
        //Im lokalen Modus ist DirectRunner bereits die Standardeinstellung, sodass Sie keinen Runner konfigurieren müssen
        PipelineOptions options = PipelineOptionsFactory.create();

        //Pipeline basierend auf Option generieren
        Pipeline pipeline = Pipeline.create(options);

        //Lesen Sie von dort Inout-Daten und PCollection(Ein Datensatz in der Pipeline)Erschaffen
        PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));

        //Gegebener String str,String num","Teilen Sie mit, ändern Sie num in Integer type, KV<String, Integer>Mach eine Form
        PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));

        //PerKey kombinieren führt im Rahmen der Operation eine GroupByKey-Konvertierung durch
        PCollection<KV<String, Integer>> sumPerKey = kvCounter
                .apply(Sum.integersPerKey());
        
        //Konvertieren Sie PCollection in ein Formular, das in eine Datei ausgegeben werden kann
        PCollection<String> output = sumPerKey.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));

        //Schreiben
        output.apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));

        // run :Mit Runner ausführen, der durch die Option PipeLine angegeben wird
        // waitUntilFinish :Warten Sie, bis PipeLine fertig ist, und geben Sie den endgültigen Status zurück
        pipeline.run().waitUntilFinish();
    }


}

Ich habe den Code für die Implementierung geschrieben (ver mit Methodenkette)

Es war ziemlich sauber

package com.company;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;

/**
 *Maine
 */
public class Main {
    /**
     *Funktionsobjekt
     *Gegebener String str,String num","Teilen durch
     *Ändern Sie num in Integer type und KV<String, Integer>Mach eine Form
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        //ProcessContext ist ein Objekt, das die Eingabe darstellt
        //Das Beam SDK übernimmt es für Sie, ohne es selbst definieren zu müssen
        public void processElement(ProcessContext c) {
            // ","Teilen mit
            String[] words = c.element().split(",");
            //Geteiltes Wort[0]Zu K, Worte[1]Zu Integer zu V.
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }


    /**
     *Funktionsobjekt
     * KV<String, Iterable<Integer>Ändern Sie den Typ in String-Typ
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //Konvertieren Sie die Eingabe in einen String-Typ
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     *Eingabedatenpfad
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Aus Datenpfad
     */
    private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";

    /**
     *Maine
     * @param args Argument
     */
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
        pipeline
                .apply(TextIO.read().from(INPUT_FILE_PATH))
                .apply(ParDo.of(new SplitWordsAndMakeKVFn()))
                .apply(Sum.integersPerKey())
                .apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
                .apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));
        pipeline.run().waitUntilFinish();
    }


}

Geladene Datei

samole.txt


Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6

Ausführungsergebnis

Die folgenden drei Dateien werden generiert. result.csv-00000-of-00003 result.csv-00001-of-00003 result.csv-00002-of-00003

Der Inhalt jeder Datei ist wie folgt. Da die Verarbeitung durch verteilte Parallelverarbeitung erfolgt, ist der Inhalt, der in welche Datei ausgegeben wird, jedes Mal zufällig.

result.csv-00000-of-00003

KV{Python, 13}

result.csv-00001-of-00003

KV{Java, 6}

result.csv-00002-of-00003

KV{Go, 17}

In Verbindung stehender Artikel

Apache Beam 2.0.x mit Google Cloud Dataflow - Qiita beginnend mit IntelliJ und Gradle

Einführung in Apache Beam mit Cloud-Datenfluss (über 2.0.0-Serie) ~ Grundlegender Teil ~ ParDo ~ --Qiita

Einführung in Apache Beam mit Google Cloud Dataflow (über 2.0.x-Serie) ~ Grundlegende Gruppierung nach Schlüssel ~ --Qiita

Die Seite, die ich als Referenz verwendet habe

Beam Programming Guide Sammlung und Wert kombinieren|Dokumentation zum Cloud-Datenfluss| Google Cloud Platform API-Referenz

Recommended Posts

Einführung in Apache Beam mit Google Cloud Dataflow (über 2.0.x-Serie) ~ Combine Edition ~
Einführung in Apache Beam mit Google Cloud Dataflow (über 2.0.x-Serie) ~ Basic Group By Key ~
Apache Beam 2.0.x mit Google Cloud Dataflow beginnend mit IntelliJ und Gradle
Apache Beam (Datenfluss) Praktische Einführung [Python]
Materialien zum Lesen, wenn Sie mit Apache Beam beginnen
Laden Sie Dateien mit Django-Speicher in Google Cloud Storages hoch und löschen Sie sie
Einführung in RDB mit sqlalchemy Ⅰ
Was ist Google Cloud Dataflow?
Serie: Einführung in den Inhalt von cx_Oracle
Änderungen an "Ausführen von Cloud Dataflow + Cloud Pub / Sub + Fluentd ..." mit SDK 2.1
Herstellen einer Verbindung zum Cloud Firestore über Google Cloud-Funktionen mit Python-Code