Dieser Abschnitt beschreibt die grundlegende Verwendung von GroupByKey, einer der fünf Kerntransformationen von Apache Beam. Ich wünschte, ich könnte zu einem anderen Zeitpunkt über CoGroupByKey usw. schreiben.
Klicken Sie hier, um die Grundlagen von Apache Beam und Cloud Dataflow zu erfahren (http://qiita.com/Sekky0905/items/381ed27fca9a16f8ef07).
Ich habe es unter Bezugnahme auf Official Beam Programming Guide geschrieben.
Parallele Reduktionsvorgänge. Mische im Map / Shuffle / Reduce-Stil. GroupByKey ist, wie der Name schon sagt, eine Kerntransformation, die "eine Sammlung nach Schlüssel gruppiert". Erstellen Sie eine neue Sammlung, indem Sie die Schlüsselwertsammlung mit mehreren Paaren mit demselben Schlüssel, aber unterschiedlichen Werten kombinieren. Nützlich zum Aggregieren von Daten mit einem gemeinsamen Schlüssel.
multimap Angenommen, Sie haben die Schlüssel Java, Python und Go. Jeder der mehreren Tasten wird ein Wert zugewiesen. Diese Karte vor der Konvertierung wird als Multimap bezeichnet.
Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6
uni-map Das Anwenden von GroupByKey auf die oben genannte Schlüsselwert-Multimap-Sammlung führt zu den folgenden Ergebnissen.
Java [1, 6, 8]
Python [2, 7]
Go[7, 8]
Nach der Konvertierung wird diese Karte als Uni-Karte bezeichnet. Eine Karte einer Sammlung von Zahlen wird den eindeutigen Tasten Java, Python und Go zugewiesen.
In Beam SDK für Java wird der Schlüsselwert anders ausgedrückt als in normalem Java. Stellt ein Schlüsselwertobjekt vom Typ KV <K, V> dar.
sample.txt
Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6
Jeder Prozess wird im Code als Kommentar beschrieben. Ich benutze die Methodenkette nicht so oft wie möglich, um das Verständnis zu priorisieren. Daher ist der Code redundant.
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/**
*Maine
* Created by sekiguchikai on 2017/07/12.
*/
public class Main {
/**
*Funktionsobjekt
*Gegebener String str,String num","Teilen durch
*Ändern Sie num in Integer type und KV<String, Integer>Mach eine Form
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
//ProcessContext ist ein Objekt, das die Eingabe darstellt
//Das Beam SDK übernimmt es für Sie, ohne es selbst definieren zu müssen
public void processElement(ProcessContext c) {
// ","Teilen mit
String[] words = c.element().split(",");
//Geteiltes Wort[0]Zu K, Worte[1]Zu Integer zu V.
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
*Funktionsobjekt
* KV<String, Iterable<Integer>Ändern Sie den Typ in String-Typ
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//Konvertieren Sie die Eingabe in einen String-Typ
c.output(String.valueOf(c.element()));
}
}
/**
*Eingabedatenpfad
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Aus Datenpfad
*/
private static final String OUTPUT_FILE_PATH = "./result.csv";
/**
*Maine
*Zum besseren Verständnis verwende ich die Methodenkette nicht so oft wie möglich
*Daher ist der Code redundant.
*
* @param args Argument
*/
public static void main(String[] args) {
//Erstellen Sie zunächst eine Option, die in der Pipeline festgelegt werden soll
//Dieses Mal starten wir es lokal, geben Sie also DirectRunner an.
//Im lokalen Modus ist DirectRunner bereits die Standardeinstellung, sodass Sie keinen Runner konfigurieren müssen
PipelineOptions options = PipelineOptionsFactory.create();
//Pipeline basierend auf Option generieren
Pipeline pipeline = Pipeline.create(options);
//Lesen Sie von dort Inout-Daten und PCollection(Ein Datensatz in der Pipeline)Erschaffen
PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
//Gegebener String str,String num","Teilen Sie mit, ändern Sie num in Integer type, KV<String, Integer>Mach eine Form
PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));
//Mit GroupByKey,{Go, [2, 9, 1, 5]}Form wie
// GroupByKey.<K, V>create())Mit GroupByKey<K, V>Generiert
PCollection<KV<String, Iterable<Integer>>> groupedWords = kvCounter.apply(
GroupByKey.<String, Integer>create());
//Zur Ausgabe<KV<String, Iterable<Integer>>>Konvertieren vom Typ in den String-Typ
PCollection<String> output = groupedWords.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));
//Schreiben
output.apply(TextIO.write().to(OUTPUT_FILE_PATH));
// run :Mit dem durch die PipeLine-Option angegebenen Runner ausführen
// waitUntilFinish :Warten Sie, bis PipeLine fertig ist, und geben Sie den endgültigen Status zurück
pipeline.run().waitUntilFinish();
}
}
Übrigens sieht es mit der Methodenkette so aus. Es war ziemlich ordentlich.
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
/**
*Maine
* Created by sekiguchikai on 2017/07/12.
*/
public class Main {
/**
*Funktionsobjekt
*Gegebener String str,String num","Teilen durch
*Ändern Sie num in Integer type und KV<String, Integer>Mach eine Form
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
//ProcessContext ist ein Objekt, das die Eingabe darstellt
//Das Beam SDK übernimmt es für Sie, ohne es selbst definieren zu müssen
public void processElement(ProcessContext c) {
// ","Teilen mit
String[] words = c.element().split(",");
//Geteiltes Wort[0]Zu K, Worte[1]Zu Integer zu V.
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
*Funktionsobjekt
* KV<String, Iterable<Integer>Ändern Sie den Typ in String-Typ
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//Konvertieren Sie die Eingabe in einen String-Typ
c.output(String.valueOf(c.element()));
}
}
/**
*Eingabedatenpfad
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Aus Datenpfad
*/
private static final String OUTPUT_FILE_PATH = "./result.csv";
/**
*Maine
*Zum besseren Verständnis verwende ich die Methodenkette nicht so oft wie möglich
*Daher ist der Code redundant.
*
* @param args Argument
*/
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
//Wie schreibe ich mit Methodenkette
pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
.apply(ParDo.of(new SplitWordsAndMakeKVFn()))
.apply(GroupByKey.<String, Integer>create())
.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
.apply(TextIO.write().to(OUTPUT_FILE_PATH));
// run :Mit dem durch die PipeLine-Option angegebenen Runner ausführen
// waitUntilFinish :Warten Sie, bis PipeLine fertig ist, und geben Sie den endgültigen Status zurück
pipeline.run().waitUntilFinish();
}
}
Die folgenden drei Dateien werden generiert. result.csv-00000-of-00003 result.csv-00001-of-00003 result.csv-00002-of-00003
Der Inhalt jeder Datei ist wie folgt. Da die Verarbeitung durch verteilte Parallelverarbeitung erfolgt, gibt es Dateien mit leeren Inhalten und einem oder zwei Inhalten. Außerdem ist jedes Mal, welcher Inhalt in welche Datei ausgegeben wird, zufällig.
result.csv-00000-of-00003 Kein Inhalt
result.csv-00001-of-00003
KV{Java, [1, 3, 2]}
result.csv-00002-of-00003
KV{Go, [5, 2, 9, 1]}
KV{Python, [5, 2, 6]}
Apache Beam 2.0.x mit Google Cloud Dataflow - Qiita beginnend mit IntelliJ und Gradle
Mit GroupByKey kombinieren|Dokumentation zum Cloud-Datenfluss| Google Cloud Platform
Recommended Posts