Einführung in Apache Beam mit Google Cloud Dataflow (über 2.0.x-Serie) ~ Basic Group By Key ~

Einführung in Apache Beam mit Google Cloud Dataflow (über 2.0.x-Serie) ~ Basic Group By Key ~

Dieser Abschnitt beschreibt die grundlegende Verwendung von GroupByKey, einer der fünf Kerntransformationen von Apache Beam. Ich wünschte, ich könnte zu einem anderen Zeitpunkt über CoGroupByKey usw. schreiben.

Klicken Sie hier, um die Grundlagen von Apache Beam und Cloud Dataflow zu erfahren (http://qiita.com/Sekky0905/items/381ed27fca9a16f8ef07).

Ich habe es unter Bezugnahme auf Official Beam Programming Guide geschrieben.

Was ist GroupByKey?

Parallele Reduktionsvorgänge. Mische im Map / Shuffle / Reduce-Stil. GroupByKey ist, wie der Name schon sagt, eine Kerntransformation, die "eine Sammlung nach Schlüssel gruppiert". Erstellen Sie eine neue Sammlung, indem Sie die Schlüsselwertsammlung mit mehreren Paaren mit demselben Schlüssel, aber unterschiedlichen Werten kombinieren. Nützlich zum Aggregieren von Daten mit einem gemeinsamen Schlüssel.

Multimap und Uni-Map

multimap Angenommen, Sie haben die Schlüssel Java, Python und Go. Jeder der mehreren Tasten wird ein Wert zugewiesen. Diese Karte vor der Konvertierung wird als Multimap bezeichnet.

Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6

uni-map Das Anwenden von GroupByKey auf die oben genannte Schlüsselwert-Multimap-Sammlung führt zu den folgenden Ergebnissen.

Java [1, 6, 8]
Python [2, 7]
Go[7, 8]

Nach der Konvertierung wird diese Karte als Uni-Karte bezeichnet. Eine Karte einer Sammlung von Zahlen wird den eindeutigen Tasten Java, Python und Go zugewiesen.

Darstellung des für Beam SDK für Java typischen Schlüsselwerts

In Beam SDK für Java wird der Schlüsselwert anders ausgedrückt als in normalem Java. Stellt ein Schlüsselwertobjekt vom Typ KV <K, V> dar.

Ich habe den Code tatsächlich geschrieben

Datei zum Lesen

sample.txt


Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6

Tatsächlicher Java-Code

Jeder Prozess wird im Code als Kommentar beschrieben. Ich benutze die Methodenkette nicht so oft wie möglich, um das Verständnis zu priorisieren. Daher ist der Code redundant.

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;


/**
 *Maine
 * Created by sekiguchikai on 2017/07/12.
 */
public class Main {
    /**
     *Funktionsobjekt
     *Gegebener String str,String num","Teilen durch
     *Ändern Sie num in Integer type und KV<String, Integer>Mach eine Form
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        //ProcessContext ist ein Objekt, das die Eingabe darstellt
        //Das Beam SDK übernimmt es für Sie, ohne es selbst definieren zu müssen
        public void processElement(ProcessContext c) {
            // ","Teilen mit
            String[] words = c.element().split(",");
            //Geteiltes Wort[0]Zu K, Worte[1]Zu Integer zu V.
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }

    /**
     *Funktionsobjekt
     * KV<String, Iterable<Integer>Ändern Sie den Typ in String-Typ
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //Konvertieren Sie die Eingabe in einen String-Typ
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     *Eingabedatenpfad
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Aus Datenpfad
     */
    private static final String OUTPUT_FILE_PATH = "./result.csv";

    /**
     *Maine
     *Zum besseren Verständnis verwende ich die Methodenkette nicht so oft wie möglich
     *Daher ist der Code redundant.
     *
     * @param args Argument
     */
    public static void main(String[] args) {
        //Erstellen Sie zunächst eine Option, die in der Pipeline festgelegt werden soll
        //Dieses Mal starten wir es lokal, geben Sie also DirectRunner an.
        //Im lokalen Modus ist DirectRunner bereits die Standardeinstellung, sodass Sie keinen Runner konfigurieren müssen
        PipelineOptions options = PipelineOptionsFactory.create();

        //Pipeline basierend auf Option generieren
        Pipeline pipeline = Pipeline.create(options);

        //Lesen Sie von dort Inout-Daten und PCollection(Ein Datensatz in der Pipeline)Erschaffen
        PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));

        //Gegebener String str,String num","Teilen Sie mit, ändern Sie num in Integer type, KV<String, Integer>Mach eine Form
        PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));

        //Mit GroupByKey,{Go, [2, 9, 1, 5]}Form wie
               // GroupByKey.<K, V>create())Mit GroupByKey<K, V>Generiert
        PCollection<KV<String, Iterable<Integer>>> groupedWords = kvCounter.apply(
                GroupByKey.<String, Integer>create());

        //Zur Ausgabe<KV<String, Iterable<Integer>>>Konvertieren vom Typ in den String-Typ
        PCollection<String> output = groupedWords.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));

        //Schreiben
        output.apply(TextIO.write().to(OUTPUT_FILE_PATH));

        // run :Mit dem durch die PipeLine-Option angegebenen Runner ausführen
        // waitUntilFinish :Warten Sie, bis PipeLine fertig ist, und geben Sie den endgültigen Status zurück
        pipeline.run().waitUntilFinish();
    }
}

Übrigens sieht es mit der Methodenkette so aus. Es war ziemlich ordentlich.


import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;


/**
 *Maine
 * Created by sekiguchikai on 2017/07/12.
 */
public class Main {
    /**
     *Funktionsobjekt
     *Gegebener String str,String num","Teilen durch
     *Ändern Sie num in Integer type und KV<String, Integer>Mach eine Form
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        //ProcessContext ist ein Objekt, das die Eingabe darstellt
        //Das Beam SDK übernimmt es für Sie, ohne es selbst definieren zu müssen
        public void processElement(ProcessContext c) {
            // ","Teilen mit
            String[] words = c.element().split(",");
            //Geteiltes Wort[0]Zu K, Worte[1]Zu Integer zu V.
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }

    /**
     *Funktionsobjekt
     * KV<String, Iterable<Integer>Ändern Sie den Typ in String-Typ
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //Konvertieren Sie die Eingabe in einen String-Typ
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     *Eingabedatenpfad
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Aus Datenpfad
     */
    private static final String OUTPUT_FILE_PATH = "./result.csv";

    /**
     *Maine
     *Zum besseren Verständnis verwende ich die Methodenkette nicht so oft wie möglich
     *Daher ist der Code redundant.
     *
     * @param args Argument
     */
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());

        //Wie schreibe ich mit Methodenkette
        pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
                .apply(ParDo.of(new SplitWordsAndMakeKVFn()))
                .apply(GroupByKey.<String, Integer>create())
                .apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
                .apply(TextIO.write().to(OUTPUT_FILE_PATH));

        // run :Mit dem durch die PipeLine-Option angegebenen Runner ausführen
        // waitUntilFinish :Warten Sie, bis PipeLine fertig ist, und geben Sie den endgültigen Status zurück
        pipeline.run().waitUntilFinish();
    }
}

Ausführungsergebnis

Die folgenden drei Dateien werden generiert. result.csv-00000-of-00003 result.csv-00001-of-00003 result.csv-00002-of-00003

Der Inhalt jeder Datei ist wie folgt. Da die Verarbeitung durch verteilte Parallelverarbeitung erfolgt, gibt es Dateien mit leeren Inhalten und einem oder zwei Inhalten. Außerdem ist jedes Mal, welcher Inhalt in welche Datei ausgegeben wird, zufällig.

result.csv-00000-of-00003 Kein Inhalt

result.csv-00001-of-00003

KV{Java, [1, 3, 2]}

result.csv-00002-of-00003

KV{Go, [5, 2, 9, 1]}
KV{Python, [5, 2, 6]}

In Verbindung stehender Artikel

Einführung in Apache Beam mit Cloud-Datenfluss (über 2.0.0-Serie) ~ Grundlegender Teil ~ Von ParDo ~ --Qiita

Apache Beam 2.0.x mit Google Cloud Dataflow - Qiita beginnend mit IntelliJ und Gradle

Die Seite, die ich als Referenz verwendet habe

Beam Programming Guide

Mit GroupByKey kombinieren|Dokumentation zum Cloud-Datenfluss|  Google Cloud Platform

Recommended Posts

Einführung in Apache Beam mit Google Cloud Dataflow (über 2.0.x-Serie) ~ Basic Group By Key ~
Einführung in Apache Beam mit Google Cloud Dataflow (über 2.0.x-Serie) ~ Combine Edition ~
Einführung in Apache Beam mit Cloud-Datenfluss (über 2.0.0-Serie) ~ Grundlegender Teil ~ ParDo ~
Apache Beam 2.0.x mit Google Cloud Dataflow beginnend mit IntelliJ und Gradle
Apache Beam (Datenfluss) Praktische Einführung [Python]