Afin d'apprendre à traiter les flux à l'aide de Cloud Dataflow sur Google Cloud Platform (GCP), tout d'abord, "[Cloud Dataflow + Cloud Pub / Sub + Fluentd a été utilisé pour créer un mécanisme de détection DoS](/ tetsuyam / items / 6636c6bbc3ee7fadfbe3) »a été essayé de fonctionner. Cependant, ** le SDK a changé pour Apache Beam base **, donc cela n'a pas fonctionné tel quel, donc je vais résumer les changements de code.
Plutôt que la sortie diff brute, elle est légèrement réorganisée afin que les différences correspondantes soient les unes à côté des autres.
--- ../Dos.java.orig	2017-11-15 17:22:31.583396781 +0900
+++ src/main/java/com/google/cloud/dataflow/examples/Dos.java	2017-11-15 17:17:45.067769398 +0900
@@ -1,35 +1,35 @@
 package com.google.cloud.dataflow.examples;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.TextIO;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+// import org.apache.beam.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.Top;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.KV.OrderByValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.KV.OrderByValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+// import org.apache.beam.sdk.runners.BlockingDataflowRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.direct.DirectRunner;
 import java.util.List;
 import org.joda.time.Duration;
@@ -52,9 +52,9 @@
   static class GetIPFn extends DoFn<String, String> {
     private static final long serialVersionUID = 0;
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+    // private final Aggregator<Long, Long> emptyLines =
+    //     createAggregator("emptyLines", new Sum.SumLongFn());
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       Gson gson = new Gson();
       ApacheLog log = gson.fromJson(c.element() , ApacheLog.class);
@@ -67,7 +67,7 @@
   public static class FormatAsTextFnFromList extends DoFn<List<KV<String, Long>>, String> {
     private static final long serialVersionUID = 0;
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       for(KV value : c.element()){
         Integer access_count = new Integer(value.getValue().toString());
@@ -79,22 +79,22 @@
   public static void main(String[] args) {
     DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
     options.setProject("xxxxxxxxxxxx");//Spécifiez le projet
     options.setStagingLocation("gs://xxxxxxxxxxxx/staging");//Spécifiez le chemin où le fichier jar sera placé
     options.setJobName("doscheck");//Spécifiez le nom de ce travail
-    options.setRunner(DataflowPipelineRunner.class);//Je n'ai pas correctement recherché Runner. .. ..
+    options.setRunner(DataflowRunner.class);//Je n'ai pas correctement recherché Runner. .. ..
     options.setStreaming(true);//Je veux faire du traitement de flux, c'est vrai
     Pipeline p = Pipeline.create(options);//Créer une topologie appelée pipeline
     //Compléter la topologie pour créer un flux de traitement
-    p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/xxxxxxx/topics/input_topic"))//Obtenu sur Pubsub
+    p.apply("ReadFromPubsub", PubsubIO.readStrings().fromTopic("projects/xxxxxxx/topics/input_topic"))//Obtenu sur Pubsub
      .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))))//Traitez pendant 10 secondes en glissant toutes les 5 secondes
      .apply(ParDo.of(new GetIPFn()))//Obtenir l'adresse IP du journal
      .apply(Count.<String>perElement())//Compter par IP
      .apply(Top.<KV<String, Long>, KV.OrderByValue<String,Long>>of(3,new KV.OrderByValue<String,Long>()).withoutDefaults())//Extraire le top 3
      .apply(ParDo.of(new FormatAsTextFnFromList()))//Convertir au format texte pour pouvoir être publié dans Pubsub
-     .apply(PubsubIO.Write.named("WriteCounts").topic("projects/xxxxxxxxxxx/topics/count_ip"));//Publier dans Pubsub
+     .apply("WriteCounts", PubsubIO.writeStrings().to("projects/xxxxxxxxxxx/topics/count_ip"));//Publier dans Pubsub
     p.run();//Exécuter le pipeline
   }
Consultez la [Note de publication: Dataflow SDK 2.x pour Java] de GCP (https://cloud.google.com/dataflow/release-notes/release-notes-java-2?hl=ja) pour connaître les modifications. Fait.
com.google.cloud.dataflow à ʻorg.apache.beam`PubsubIO ont été déplacés vers des sous-packagesPipeline du nom de la classeBlockingDataflowPipelineRunner a été supprimé et intégré dans DataflowRunner@ Override à @ ProcessElementPCollection.apply () au lieu d'utiliser named ()ReadStrings ()](https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO. Instancier avec html # readStrings-) [^ t_read](idem pour l'écriture)topic () est [fromTopic ()](https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO .Read.html # fromTopic-java.lang.String-) / [to ()](https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk Remplacé par /io/gcp/pubsub/PubsubIO.Write.html#to-java.lang.String-)[^ t_read]: La documentation dit d'instancier avec <T> read (), mais c'est privé et ne peut pas être appelé. [readStrings ()](https://github.com/apache/beam/blob/v2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/ sdk / io / gcp / pubsub / PubsubIO.java # L469) etc. appellent cela à l'intérieur.
Je ne savais même pas comment créer un projet dans Maven, alors notez la commande basée sur Quickstart (https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven?hl=ja) Je le ferai. Je l'ai exécuté dans le terminal de la console GCP.
Création de projet
$ mvn archetype:generate \
      -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
      -DarchetypeGroupId=com.google.cloud.dataflow \
      -DarchetypeVersion=2.1.0 \
      -DgroupId=com.google.cloud.dataflow.examples \
      -DartifactId=dos \
      -Dversion="0.1" \
      -DinteractiveMode=false \
      -Dpackage=com.google.cloud.dataflow.examples
$ cd dos/
$ rm -rf src/main/java/com/google/cloud/dataflow/examples/*
$ vim src/main/java/com/google/cloud/dataflow/examples/Dos.java
Compiler et exécuter
$ mvn compile exec:java \
      -Dexec.mainClass=com.google.cloud.dataflow.examples.Dos
        Recommended Posts