Cette section décrit l'utilisation de base de GroupByKey, l'une des cinq principales transformations d'Apache Beam. J'aimerais pouvoir écrire sur CoGroupByKey etc. à un autre moment.
Pour connaître les bases d'Apache Beam et de Cloud Dataflow, cliquez ici (http://qiita.com/Sekky0905/items/381ed27fca9a16f8ef07)
Je l'ai écrit en référence au Official Beam Programming Guide.
Opérations de réduction parallèles. Mélanger dans le style Map / Shuffle / Réduire GroupByKey, comme son nom l'indique, est une transformation principale qui «regroupe une collection par clé». Créez une nouvelle collection en combinant la collection clé-valeur avec plusieurs paires avec la même clé mais des valeurs différentes. Utile pour agréger des données ayant une clé commune.
multimap Par exemple, supposons que vous ayez les clés Java, Python et Go. Une valeur est attribuée à chacune de la pluralité de clés. Cette carte avant la conversion est appelée multimap.
Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6
uni-map L'application de GroupByKey à la collection multimap clé-valeur ci-dessus donne les résultats suivants.
Java [1, 6, 8]
Python [2, 7]
Go[7, 8]
Après conversion, cette carte est appelée uni-map. Une carte d'une collection de nombres est affectée aux clés uniques Java, Python et Go.
Dans Beam SDK pour Java, la valeur-clé est exprimée différemment de Java normal. Représente un objet clé-valeur de type KV <K, V>.
sample.txt
Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6
Chaque processus est décrit comme un commentaire dans le code. Je n'utilise pas autant que possible la chaîne de méthodes pour prioriser la compréhension. Par conséquent, le code est redondant.
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/**
*Principale
* Created by sekiguchikai on 2017/07/12.
*/
public class Main {
/**
*Objet de fonction
*Donnée String str,Numéro de chaîne","Diviser par
*Changer num en type entier et KV<String, Integer>Faire un moule
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
//ProcessContext est un objet qui représente l'entrée
//Beam SDK le récupérera pour vous sans avoir à le définir vous-même
public void processElement(ProcessContext c) {
// ","Split avec
String[] words = c.element().split(",");
//Mot divisé[0]À K, mots[1]Vers un entier vers V
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
*Objet de fonction
* KV<String, Iterable<Integer>Changer le type en type String
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//Convertir l'entrée en type String
c.output(String.valueOf(c.element()));
}
}
/**
*Chemin des données d'entrée
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Chemin de données de sortie
*/
private static final String OUTPUT_FILE_PATH = "./result.csv";
/**
*Principale
*Par souci de compréhension, je n'utilise pas autant que possible la chaîne de méthodes
*Par conséquent, le code est redondant.
*
* @argument param args
*/
public static void main(String[] args) {
//Commencez par créer une option à définir dans Pipeline
//Cette fois, nous allons le démarrer localement, alors spécifiez DirectRunner.
//En mode local, DirectRunner est déjà la valeur par défaut, vous n'avez donc pas besoin de configurer un coureur
PipelineOptions options = PipelineOptionsFactory.create();
//Générer un pipeline en fonction de l'option
Pipeline pipeline = Pipeline.create(options);
//Lire les données d'entrée et PCollection à partir de là(Un ensemble de données dans le pipeline)Créer
PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
//Donnée String str,Numéro de chaîne","Diviser avec, changer num en type entier, KV<String, Integer>Faire un moule
PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));
//Avec GroupByKey,{Go, [2, 9, 1, 5]}Forme comme
// GroupByKey.<K, V>create())Avec GroupByKey<K, V>Génère
PCollection<KV<String, Iterable<Integer>>> groupedWords = kvCounter.apply(
GroupByKey.<String, Integer>create());
//Pour la sortie<KV<String, Iterable<Integer>>>Conversion de type en type chaîne
PCollection<String> output = groupedWords.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));
//Écrire
output.apply(TextIO.write().to(OUTPUT_FILE_PATH));
// run :Exécuter avec Runner spécifié par l'option PipeLine
// waitUntilFinish :Attendez que PipeLine se termine et retourne l'état final
pipeline.run().waitUntilFinish();
}
}
Au fait, en utilisant la chaîne de méthodes, cela ressemble à ceci. C'était plutôt chouette.
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
/**
*Principale
* Created by sekiguchikai on 2017/07/12.
*/
public class Main {
/**
*Objet de fonction
*Donnée String str,Numéro de chaîne","Diviser par
*Changer num en type entier et KV<String, Integer>Faire un moule
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
//ProcessContext est un objet qui représente l'entrée
//Beam SDK le récupérera pour vous sans avoir à le définir vous-même
public void processElement(ProcessContext c) {
// ","Split avec
String[] words = c.element().split(",");
//Mot divisé[0]À K, mots[1]Vers un entier vers V
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
*Objet de fonction
* KV<String, Iterable<Integer>Changer le type en type String
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//Convertir l'entrée en type String
c.output(String.valueOf(c.element()));
}
}
/**
*Chemin des données d'entrée
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Chemin de données de sortie
*/
private static final String OUTPUT_FILE_PATH = "./result.csv";
/**
*Principale
*Par souci de compréhension, je n'utilise pas autant que possible la chaîne de méthodes
*Par conséquent, le code est redondant.
*
* @argument param args
*/
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
//Comment écrire en utilisant la chaîne de méthodes
pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
.apply(ParDo.of(new SplitWordsAndMakeKVFn()))
.apply(GroupByKey.<String, Integer>create())
.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
.apply(TextIO.write().to(OUTPUT_FILE_PATH));
// run :Exécuter avec Runner spécifié par l'option PipeLine
// waitUntilFinish :Attendez que PipeLine se termine et retourne l'état final
pipeline.run().waitUntilFinish();
}
}
Les trois fichiers suivants sont générés. result.csv-00000-of-00003 result.csv-00001-of-00003 result.csv-00002-of-00003
Le contenu de chaque fichier est le suivant. Le traitement étant effectué par un traitement parallèle distribué, il existe des fichiers avec un contenu vierge et un ou deux contenus. En outre, quel contenu est sorti vers quel fichier est aléatoire à chaque fois.
result.csv-00000-of-00003 Pas de contenu
result.csv-00001-of-00003
KV{Java, [1, 3, 2]}
result.csv-00002-of-00003
KV{Go, [5, 2, 9, 1]}
KV{Python, [5, 2, 6]}
Apache Beam 2.0.x avec Google Cloud Dataflow --Qiita commençant par IntelliJ et Gradle
Combiner avec GroupByKey|Documentation Cloud Dataflow| Google Cloud Platform
Recommended Posts