Dieser Abschnitt beschreibt die grundlegende Verwendung von Combine, einer der fünf Kerntransformationen von Apache Beam. Die grundlegende Geschichte anderer Kerntransformationen und Apache Beam 2.0.x wird im Folgenden beschrieben.
Apache Beam 2.0.x mit Google Cloud Dataflow - Qiita beginnend mit IntelliJ und Gradle
Dieser Artikel wurde unter Bezugnahme auf die folgenden zwei offiziellen Dokumente verfasst.
Sammlung und Wert kombinieren|Dokumentation zum Cloud-Datenfluss| Google Cloud Platform
Kombinieren kombiniert oder führt jedes in der PCollection vorhandene Element (alle Daten) zusammen. Ich erkenne, dass es wie Reduzieren in Karte / Mischen / Reduzieren ist.
Es gibt zwei Hauptmethoden zum Kombinieren. "So kombinieren Sie in einer PCollection vorhandene Elemente, um einen Wert zu generieren" und "So kombinieren Sie jedes Element des Werteteils der PCollection, gruppiert nach Schlüssel, um einen Wert zu generieren" Ist. Im Folgenden möchte ich jede Methode beschreiben.
Kombinieren Sie jedes Element in der PCollection. => Es ist zu beachten, dass sich dies von ParDo unterscheidet. ParDo verarbeitet jedes Element in der PCollection. Kombinieren kombiniert jedes Element in der PCollection.
Dies ist beispielsweise der Fall, wenn in der PCollection vorhandene Elemente kombiniert werden, um einen Wert zu generieren.
PCollection<Integer> sum = pCollection.apply(Sum.integersGlobally());
Auf den ersten Blick scheint Combine nicht zu existieren, aber Sum.integersGlobally () umschließt Combine.globally. Die tatsächliche Summe.integersGlobally () ist unten.
public static Combine.Globally<Integer, Integer> integersGlobally() {
return Combine.globally(Sum.ofIntegers());}
Referenz API-Referenz
withoutDefaults() Wenn Sie leer zurückgeben möchten, wenn eine leere PCollection als inout angegeben wird, fügen Sie withoutDefaults () hinzu.
PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
Im Fall von Global Window wird standardmäßig eine PCollection zurückgegeben, die ein Element enthält.
Andererseits wird im Fall eines nicht globalen Fensters die obige Standardoperation nicht ausgeführt. Geben Sie die Option an, wenn Sie Kombinieren verwenden. Die Formel war leicht zu verstehen, deshalb zitiere ich sie unten. (Zum Zeitpunkt des Schreibens dieses Beitrags hatte Apache Beam 2.0.x diese Beschreibung noch nicht im Dokument. Google Cloud Dataflow 1.9 wird aus dem offiziellen Dokument zitiert.)
Geben Sie> .withoutDefaults an. In diesem Fall ist das leere Fenster in der Eingabe-PC-Sammlung auch unter Ausgabe> Sammlung leer.
Geben Sie> .asSingletonView an. In diesem Fall wird die Ausgabe sofort in eine PCollectionView konvertiert. Dies ist der Standardwert, wenn jedes leere Fenster als sekundäre Eingabe verwendet wird. Im Allgemeinen sollte diese Option nur verwendet werden, wenn das Ergebnis des Kombinats der Pipeline später als sekundäre Eingabe in der Pipeline verwendet wird.
Quelle zitieren: Sammlung und Wert kombinieren|Dokumentation zum Cloud-Datenfluss| Google Cloud Platform
Jeder Prozess wird im Code als Kommentar beschrieben. Ich benutze die Methodenkette nicht so oft wie möglich, um das Verständnis zu priorisieren. Daher ist der Code redundant.
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;
/**
*Hauptklasse
*/
public class Main {
/**
*Funktionsobjekt
* String =>Führen Sie eine Ganzzahltypkonvertierung durch
*/
static class TransformTypeFromStringToInteger extends DoFn<String, Integer> {
@ProcessElement
public void processElement(ProcessContext c) {
//String das Element=>In Integer konvertieren und ausgeben
c.output(Integer.parseInt(c.element()));
}
}
/**
*Funktionsobjekt
* Integer =>Führen Sie eine Typkonvertierung von String durch
*/
static class TransformTypeFromIntegerToString extends DoFn<Integer, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//String das Element=>In Integer konvertieren und ausgeben
System.out.println(c.element());
c.output(String.valueOf(c.element()));
}
}
/**
*Eingabedatenpfad
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Aus Datenpfad
*/
private static final String OUTPUT_FILE_PATH = "./result.txt";
/**
*Verwenden Sie zum Verständnis nicht so oft Methodenketten wie möglich
*Daher gibt es redundante Teile
*Hauptmethode
*
* @param args
*/
public static void main(String[] args) {
//Generieren Sie eine Pipeline, indem Sie die Option angeben
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
System.out.println("a");
//Aus Datei lesen
PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
//Zeichne alle gelesenen Daten auf=>In Integer konvertieren
PCollection<Integer> integerPCollection = lines.apply(ParDo.of(new TransformTypeFromStringToInteger()));
// Combine.Fassen Sie jedes Element von PCollection in Global zusammen
//Für eine leere PC-Sammlung, wenn Sie leer zurückkehren möchten=> PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
// PCollection<Integer>Summe Ganzzahl=>In String konvertieren
PCollection<String> sumString = sum.apply(ParDo.of(new TransformTypeFromIntegerToString()));
//In Datei schreiben
sumString.apply(TextIO.write().to(OUTPUT_FILE_PATH));
//Lauf
pipeline.run().waitUntilFinish();
}
}
Es war ziemlich sauber
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
/**
*Hauptklasse
*/
public class Main {
/**
*Funktionsobjekt
* String =>Führen Sie eine Ganzzahltypkonvertierung durch
*/
static class TransformTypeFromStringToInteger extends DoFn<String, Integer> {
@ProcessElement
public void processElement(ProcessContext c) {
//String das Element=>In Integer konvertieren und ausgeben
c.output(Integer.parseInt(c.element()));
}
}
/**
*Funktionsobjekt
* Integer =>Führen Sie eine Typkonvertierung von String durch
*/
static class TransformTypeFromIntegerToString extends DoFn<Integer, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//String das Element=>In Integer konvertieren und ausgeben
System.out.println(c.element());
c.output(String.valueOf(c.element()));
}
}
/**
*Eingabedatenpfad
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Aus Datenpfad
*/
private static final String OUTPUT_FILE_PATH = "./result.txt";
/**
*Hauptmethode
*
* @param args
*/
public static void main(String[] args) {
//Pipeline-Generierung
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
//Teil bearbeiten
pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
.apply(ParDo.of(new TransformTypeFromStringToInteger()))
.apply(Sum.integersGlobally().withoutDefaults())
.apply(ParDo.of(new TransformTypeFromIntegerToString()))
.apply(TextIO.write().to(OUTPUT_FILE_PATH));
//Lauf
pipeline.run().waitUntilFinish();
}
}
samole.txt
1
2
3
4
5
6
7
8
9
10
result.txt-00000-of-00001 wird ausgegeben Der Inhalt von result.txt-00000-of-00001
55
Was machst du
10
Σk
k = 1
Es ist wie es ist.
PerKey Wenn GroupByKey ausgeführt wird, wird es zu K, V (Iterable Collection). Zum Beispiel:
Java [1, 2, 3]
Der PerKey of Combine kombiniert den V [Iterable Collection] -Teil dieser K, V [Iterable Collection] für jeden Schlüssel. Wenn Sie beispielsweise K und V (Iterable Collection) nach GroupByKey oben kombinieren, ist dies wie folgt.
Java [6]
Alle Elemente von V (Iterable Collection) von K und V (Iterable Collection) werden kombiniert.
Jeder Prozess wird im Code als Kommentar beschrieben. Ich benutze die Methodenkette nicht so oft wie möglich, um das Verständnis zu priorisieren. Daher ist der Code redundant.
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/**
*Maine
*/
public class Main {
/**
*Funktionsobjekt
*Gegebener String str,String num","Teilen durch
*Ändern Sie num in Integer type und KV<String, Integer>Mach eine Form
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
//ProcessContext ist ein Objekt, das die Eingabe darstellt
//Das Beam SDK übernimmt es für Sie, ohne es selbst definieren zu müssen
public void processElement(ProcessContext c) {
// ","Teilen mit
String[] words = c.element().split(",");
//Geteiltes Wort[0]Zu K, Worte[1]Zu Integer zu V.
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
*Funktionsobjekt
* KV<String, Iterable<Integer>Ändern Sie den Typ in String-Typ
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//Konvertieren Sie die Eingabe in einen String-Typ
c.output(String.valueOf(c.element()));
}
}
/**
*Eingabedatenpfad
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Aus Datenpfad
*/
private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";
/**
*Maine
*Zum besseren Verständnis verwende ich die Methodenkette nicht so oft wie möglich
*Daher ist der Code redundant.
*
* @param args Argument
*/
public static void main(String[] args) {
//Erstellen Sie zunächst eine Option, die in der Pipeline festgelegt werden soll
//Dieses Mal starten wir es lokal, geben Sie also DirectRunner an.
//Im lokalen Modus ist DirectRunner bereits die Standardeinstellung, sodass Sie keinen Runner konfigurieren müssen
PipelineOptions options = PipelineOptionsFactory.create();
//Pipeline basierend auf Option generieren
Pipeline pipeline = Pipeline.create(options);
//Lesen Sie von dort Inout-Daten und PCollection(Ein Datensatz in der Pipeline)Erschaffen
PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
//Gegebener String str,String num","Teilen Sie mit, ändern Sie num in Integer type, KV<String, Integer>Mach eine Form
PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));
//PerKey kombinieren führt im Rahmen der Operation eine GroupByKey-Konvertierung durch
PCollection<KV<String, Integer>> sumPerKey = kvCounter
.apply(Sum.integersPerKey());
//Konvertieren Sie PCollection in ein Formular, das in eine Datei ausgegeben werden kann
PCollection<String> output = sumPerKey.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));
//Schreiben
output.apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));
// run :Mit Runner ausführen, der durch die Option PipeLine angegeben wird
// waitUntilFinish :Warten Sie, bis PipeLine fertig ist, und geben Sie den endgültigen Status zurück
pipeline.run().waitUntilFinish();
}
}
Es war ziemlich sauber
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
/**
*Maine
*/
public class Main {
/**
*Funktionsobjekt
*Gegebener String str,String num","Teilen durch
*Ändern Sie num in Integer type und KV<String, Integer>Mach eine Form
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
//ProcessContext ist ein Objekt, das die Eingabe darstellt
//Das Beam SDK übernimmt es für Sie, ohne es selbst definieren zu müssen
public void processElement(ProcessContext c) {
// ","Teilen mit
String[] words = c.element().split(",");
//Geteiltes Wort[0]Zu K, Worte[1]Zu Integer zu V.
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
*Funktionsobjekt
* KV<String, Iterable<Integer>Ändern Sie den Typ in String-Typ
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//Konvertieren Sie die Eingabe in einen String-Typ
c.output(String.valueOf(c.element()));
}
}
/**
*Eingabedatenpfad
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Aus Datenpfad
*/
private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";
/**
*Maine
* @param args Argument
*/
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
pipeline
.apply(TextIO.read().from(INPUT_FILE_PATH))
.apply(ParDo.of(new SplitWordsAndMakeKVFn()))
.apply(Sum.integersPerKey())
.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
.apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));
pipeline.run().waitUntilFinish();
}
}
samole.txt
Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6
Die folgenden drei Dateien werden generiert. result.csv-00000-of-00003 result.csv-00001-of-00003 result.csv-00002-of-00003
Der Inhalt jeder Datei ist wie folgt. Da die Verarbeitung durch verteilte Parallelverarbeitung erfolgt, ist der Inhalt, der in welche Datei ausgegeben wird, jedes Mal zufällig.
result.csv-00000-of-00003
KV{Python, 13}
result.csv-00001-of-00003
KV{Java, 6}
result.csv-00002-of-00003
KV{Go, 17}
Apache Beam 2.0.x mit Google Cloud Dataflow - Qiita beginnend mit IntelliJ und Gradle
Beam Programming Guide Sammlung und Wert kombinieren|Dokumentation zum Cloud-Datenfluss| Google Cloud Platform API-Referenz
Recommended Posts