[GO] Änderungen an "Ausführen von Cloud Dataflow + Cloud Pub / Sub + Fluentd ..." mit SDK 2.1

Um zu lernen, wie Streams mithilfe von Cloud Dataflow auf der Google Cloud Platform (GCP) verarbeitet werden, wurde zunächst "[Cloud Dataflow + Cloud Pub / Sub + Fluentd wurde verwendet, um einen Mechanismus für die DoS-Erkennung zu erstellen]" (/ tetsuyam / items / 6636c6bbc3ee7fadfbe3) ”wurde versucht zu funktionieren. ** Das SDK wurde jedoch in Apache Beam Base ** geändert, sodass es nicht so funktioniert hat, wie es ist. Daher werde ich die Codeänderungen zusammenfassen.

Code Unterschied

Anstelle der Rohdiff-Ausgabe wird sie leicht neu angeordnet, sodass die entsprechenden Unterschiede nebeneinander liegen.

--- ../Dos.java.orig	2017-11-15 17:22:31.583396781 +0900
+++ src/main/java/com/google/cloud/dataflow/examples/Dos.java	2017-11-15 17:17:45.067769398 +0900
@@ -1,35 +1,35 @@
 package com.google.cloud.dataflow.examples;

-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.TextIO;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+// import org.apache.beam.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.Top;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.KV.OrderByValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.KV.OrderByValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+// import org.apache.beam.sdk.runners.BlockingDataflowRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.direct.DirectRunner;

 import java.util.List;
 import org.joda.time.Duration;
@@ -52,9 +52,9 @@

   static class GetIPFn extends DoFn<String, String> {
     private static final long serialVersionUID = 0;
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+    // private final Aggregator<Long, Long> emptyLines =
+    //     createAggregator("emptyLines", new Sum.SumLongFn());
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       Gson gson = new Gson();
       ApacheLog log = gson.fromJson(c.element() , ApacheLog.class);
@@ -67,7 +67,7 @@

   public static class FormatAsTextFnFromList extends DoFn<List<KV<String, Long>>, String> {
     private static final long serialVersionUID = 0;
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       for(KV value : c.element()){
         Integer access_count = new Integer(value.getValue().toString());
@@ -79,22 +79,22 @@

   public static void main(String[] args) {
     DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
     options.setProject("xxxxxxxxxxxx");//Projekt angeben
     options.setStagingLocation("gs://xxxxxxxxxxxx/staging");//Geben Sie den Pfad an, in dem die JAR-Datei abgelegt werden soll
     options.setJobName("doscheck");//Geben Sie den Namen dieses Jobs an
-    options.setRunner(DataflowPipelineRunner.class);//Ich habe Runner nicht richtig recherchiert. .. ..
+    options.setRunner(DataflowRunner.class);//Ich habe Runner nicht richtig recherchiert. .. ..
     options.setStreaming(true);//Ich möchte Stream-Verarbeitung machen, also wahr

     Pipeline p = Pipeline.create(options);//Erstellen Sie eine Topologie, die als Pipeline bezeichnet wird
     //Fleisch die Topologie, um einen Verarbeitungsablauf zu erstellen
-    p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/xxxxxxx/topics/input_topic"))//Erhalten von Pubsub
+    p.apply("ReadFromPubsub", PubsubIO.readStrings().fromTopic("projects/xxxxxxx/topics/input_topic"))//Erhalten von Pubsub
      .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))))//10 Sekunden lang verarbeiten, dabei alle 5 Sekunden gleiten
      .apply(ParDo.of(new GetIPFn()))//IP aus dem Protokoll abrufen
      .apply(Count.<String>perElement())//Nach IP zählen
      .apply(Top.<KV<String, Long>, KV.OrderByValue<String,Long>>of(3,new KV.OrderByValue<String,Long>()).withoutDefaults())//Extrahieren Sie die Top 3
      .apply(ParDo.of(new FormatAsTextFnFromList()))//In ein Textformat konvertieren, damit es in Pubsub veröffentlicht werden kann
-     .apply(PubsubIO.Write.named("WriteCounts").topic("projects/xxxxxxxxxxx/topics/count_ip"));//In Pubsub veröffentlichen
+     .apply("WriteCounts", PubsubIO.writeStrings().to("projects/xxxxxxxxxxx/topics/count_ip"));//In Pubsub veröffentlichen
     p.run();//Führen Sie die Pipeline aus
   }

Weitere Informationen finden Sie in GCPs Versionshinweis: Dataflow SDK 2.x für Java. Hat.

[^ t_read]: In der Dokumentation wird angegeben, dass mit " read ()" instanziiert werden soll, es ist jedoch privat und kann nicht aufgerufen werden. [readStrings ()](https://github.com/apache/beam/blob/v2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/ sdk / io / gcp / pubsub / PubsubIO.java # L469) usw. rufen dies im Inneren auf.

Blinddarm

Maven-Betrieb

Ich wusste nicht einmal, wie man ein Projekt in Maven erstellt. Notieren Sie sich daher den Befehl basierend auf Quickstart (https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven?hl=ja). Ich werde das machen. Ich habe es im Terminal der GCP-Konsole ausgeführt.

Projekterstellung


$ mvn archetype:generate \
      -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
      -DarchetypeGroupId=com.google.cloud.dataflow \
      -DarchetypeVersion=2.1.0 \
      -DgroupId=com.google.cloud.dataflow.examples \
      -DartifactId=dos \
      -Dversion="0.1" \
      -DinteractiveMode=false \
      -Dpackage=com.google.cloud.dataflow.examples
$ cd dos/
$ rm -rf src/main/java/com/google/cloud/dataflow/examples/*
$ vim src/main/java/com/google/cloud/dataflow/examples/Dos.java

Kompilieren und ausführen


$ mvn compile exec:java \
      -Dexec.mainClass=com.google.cloud.dataflow.examples.Dos

Recommended Posts

Änderungen an "Ausführen von Cloud Dataflow + Cloud Pub / Sub + Fluentd ..." mit SDK 2.1
Führen Sie XGBoost mit Cloud Dataflow (Python) aus.
GCP: Wiederholen Sie den Vorgang von Pub / Sub zu Cloud-Funktionen und von Cloud-Funktionen zu Pub / Sub
Führen Sie eine Pipeline für maschinelles Lernen mit Cloud Dataflow (Python) aus.
Cloud-Funktionen zum Ändern der Bildgröße mithilfe von OpenCV mit Cloud Storage-Trigger
Hochladen von Dateien in den Cloud-Speicher mit dem Python-SDK von Firebase
Ermöglichen Sie die schnelle Ausführung von Python-Skripten in Cloud Run mithilfe des Responders
Um gym_torcs mit ubutnu16 auszuführen
Laden Sie Dateien mit SDK (Python-Version) auf Aspera hoch, das mit IBM Cloud Object Storage (ICOS) geliefert wird.
Einführung in Apache Beam mit Cloud-Datenfluss (über 2.0.0-Serie) ~ Grundlegender Teil ~ ParDo ~
Einführung in Apache Beam mit Google Cloud Dataflow (über 2.0.x-Serie) ~ Combine Edition ~