Um zu lernen, wie Streams mithilfe von Cloud Dataflow auf der Google Cloud Platform (GCP) verarbeitet werden, wurde zunächst "[Cloud Dataflow + Cloud Pub / Sub + Fluentd wurde verwendet, um einen Mechanismus für die DoS-Erkennung zu erstellen]" (/ tetsuyam / items / 6636c6bbc3ee7fadfbe3) ”wurde versucht zu funktionieren. ** Das SDK wurde jedoch in Apache Beam Base ** geändert, sodass es nicht so funktioniert hat, wie es ist. Daher werde ich die Codeänderungen zusammenfassen.
Anstelle der Rohdiff-Ausgabe wird sie leicht neu angeordnet, sodass die entsprechenden Unterschiede nebeneinander liegen.
--- ../Dos.java.orig 2017-11-15 17:22:31.583396781 +0900
+++ src/main/java/com/google/cloud/dataflow/examples/Dos.java 2017-11-15 17:17:45.067769398 +0900
@@ -1,35 +1,35 @@
package com.google.cloud.dataflow.examples;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.TextIO;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+// import org.apache.beam.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.Top;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.KV.OrderByValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.KV.OrderByValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+// import org.apache.beam.sdk.runners.BlockingDataflowRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.direct.DirectRunner;
import java.util.List;
import org.joda.time.Duration;
@@ -52,9 +52,9 @@
static class GetIPFn extends DoFn<String, String> {
private static final long serialVersionUID = 0;
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ // private final Aggregator<Long, Long> emptyLines =
+ // createAggregator("emptyLines", new Sum.SumLongFn());
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
Gson gson = new Gson();
ApacheLog log = gson.fromJson(c.element() , ApacheLog.class);
@@ -67,7 +67,7 @@
public static class FormatAsTextFnFromList extends DoFn<List<KV<String, Long>>, String> {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
for(KV value : c.element()){
Integer access_count = new Integer(value.getValue().toString());
@@ -79,22 +79,22 @@
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setProject("xxxxxxxxxxxx");//Projekt angeben
options.setStagingLocation("gs://xxxxxxxxxxxx/staging");//Geben Sie den Pfad an, in dem die JAR-Datei abgelegt werden soll
options.setJobName("doscheck");//Geben Sie den Namen dieses Jobs an
- options.setRunner(DataflowPipelineRunner.class);//Ich habe Runner nicht richtig recherchiert. .. ..
+ options.setRunner(DataflowRunner.class);//Ich habe Runner nicht richtig recherchiert. .. ..
options.setStreaming(true);//Ich möchte Stream-Verarbeitung machen, also wahr
Pipeline p = Pipeline.create(options);//Erstellen Sie eine Topologie, die als Pipeline bezeichnet wird
//Fleisch die Topologie, um einen Verarbeitungsablauf zu erstellen
- p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/xxxxxxx/topics/input_topic"))//Erhalten von Pubsub
+ p.apply("ReadFromPubsub", PubsubIO.readStrings().fromTopic("projects/xxxxxxx/topics/input_topic"))//Erhalten von Pubsub
.apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))))//10 Sekunden lang verarbeiten, dabei alle 5 Sekunden gleiten
.apply(ParDo.of(new GetIPFn()))//IP aus dem Protokoll abrufen
.apply(Count.<String>perElement())//Nach IP zählen
.apply(Top.<KV<String, Long>, KV.OrderByValue<String,Long>>of(3,new KV.OrderByValue<String,Long>()).withoutDefaults())//Extrahieren Sie die Top 3
.apply(ParDo.of(new FormatAsTextFnFromList()))//In ein Textformat konvertieren, damit es in Pubsub veröffentlicht werden kann
- .apply(PubsubIO.Write.named("WriteCounts").topic("projects/xxxxxxxxxxx/topics/count_ip"));//In Pubsub veröffentlichen
+ .apply("WriteCounts", PubsubIO.writeStrings().to("projects/xxxxxxxxxxx/topics/count_ip"));//In Pubsub veröffentlichen
p.run();//Führen Sie die Pipeline aus
}
Weitere Informationen finden Sie in GCPs Versionshinweis: Dataflow SDK 2.x für Java. Hat.
Aggregator
nicht, also habe ich es gelöscht)BlockingDataflowPipelineRunner
wurde entfernt und in DataflowRunner
integriertReadStrings ()
](https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO. Instanziieren Sie mit HTML # readStrings-) [^ t_read](dasselbe gilt für Schreiben)topic ()
ist [fromTopic ()
](https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO .Read.html # fromTopic-java.lang.String-) / [to ()
](https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk Geändert zu /io/gcp/pubsub/PubsubIO.Write.html#to-java.lang.String-)[^ t_read]: In der Dokumentation wird angegeben, dass mit "readStrings ()
](https://github.com/apache/beam/blob/v2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/ sdk / io / gcp / pubsub / PubsubIO.java # L469) usw. rufen dies im Inneren auf.
Ich wusste nicht einmal, wie man ein Projekt in Maven erstellt. Notieren Sie sich daher den Befehl basierend auf Quickstart (https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven?hl=ja). Ich werde das machen. Ich habe es im Terminal der GCP-Konsole ausgeführt.
Projekterstellung
$ mvn archetype:generate \
-DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
-DarchetypeGroupId=com.google.cloud.dataflow \
-DarchetypeVersion=2.1.0 \
-DgroupId=com.google.cloud.dataflow.examples \
-DartifactId=dos \
-Dversion="0.1" \
-DinteractiveMode=false \
-Dpackage=com.google.cloud.dataflow.examples
$ cd dos/
$ rm -rf src/main/java/com/google/cloud/dataflow/examples/*
$ vim src/main/java/com/google/cloud/dataflow/examples/Dos.java
Kompilieren und ausführen
$ mvn compile exec:java \
-Dexec.mainClass=com.google.cloud.dataflow.examples.Dos
Recommended Posts