[PYTHON] Was ist Google Cloud Dataflow?

Es ist ein Paket von WACUL Co., Ltd. und CTO. Alle in der Firma haben Spaß daran, Brot zu backen, Curry zu machen und Filme zu schauen.

Ich habe in letzter Zeit mit Google Cloud Dataflow herumgespielt und es daher als Einführung organisiert. Ich hoffe, es wird denjenigen helfen, die es zuerst berühren wollen, um einen groben Überblick zu bekommen. Ich habe keine fundierten Kenntnisse in Streaming oder Stapelverarbeitung, daher würde ich mich freuen, wenn Sie mir sagen könnten, was ich falsch gemacht habe.

Was Sie verstehen können, wenn Sie diesen Artikel lesen

Was Sie nach dem Lesen dieses Artikels nicht verstehen

Zunächst Referenzmaterialien

Dies ist ein Referenzmaterial zu Google Cloud Dataflow, das ich bisher gelesen habe.

Was ist Google Cloud Dataflow?

Cloud Dataflow ist eine umfangreiche Datenverarbeitungs-Engine und deren verwalteter Dienst. Im Allgemeinen wäre es schön, sich das als Begleiter von Hadoop, Spark usw. vorzustellen. Die Hauptmerkmale sind die Bereitstellung eines neuen Programmiermodells und einer vollständig verwalteten Ausführungsumgebung.

Programmiermodell, das Stapelverarbeitung und Streaming-Verarbeitung integriert

Der Unterschied zwischen Stapelverarbeitung und Streaming-Verarbeitung besteht darin, ob die verarbeiteten Daten endlich (begrenzt) oder unendlich (unbegrenzt) sind. Da die Stapelverarbeitungsplattform eine längere Geschichte hat und stabiler als die Streaming-Verarbeitungsplattform ist, wurde die Datenverarbeitung in großem Maßstab auf der Grundlage der Stapelverarbeitung entwickelt. Andererseits besteht eine wachsende Nachfrage nach schnelleren Geschäftsentscheidungen und einer schnelleren Bereitstellung von Daten für Benutzer sowie eine wachsende Nachfrage nach Streaming-Verarbeitungs-Engines, die kontinuierlich endlose Daten verarbeiten.

Daher wurde Lambda Architecture veröffentlicht, das Stapelverarbeitung und Streaming-Verarbeitung kombiniert, um das Endergebnis zu erzielen. Hat. Die Wartung eines Systems mit einer Lambda-Architektur kann jedoch eine entmutigende Aufgabe sein. Es ist schwer vorstellbar, dass Sie unterschiedliche Programmiermodelle verwenden und auf logischer Ebene konsistent sein müssen. .. ..

Das von Cloud Dataflow bereitgestellte Programmiermodell scheint darauf abzuzielen, die endliche Datenverarbeitung und die unendliche Datenverarbeitung basierend auf der Streaming-Verarbeitung zu integrieren. Um es grob auszudrücken: Wenn Sie ein Modell haben, das neue eingehende Daten genau verarbeitet, und eine Engine, die in einer realistischen Zeit ausgeführt werden kann, erhalten Sie das gleiche Ergebnis, wenn Sie alle vergangenen Daten wiedergeben und eingeben. Das ist gut! darüber.

Das Problem bei vorhandenen Streaming-Engines bestand darin, dass es schwierig war, die Integrität der Daten zu kontrollieren, hauptsächlich aufgrund der Schwierigkeit, mit dem Zeitkonzept umzugehen. Die Schwierigkeit, mit dem Zeitbegriff umzugehen, beruht auf der Tatsache, dass es in der Realität viele Fälle gibt, in denen sich die Ausführungszeit bei der tatsächlichen Verarbeitung von der Zeit unterscheidet, zu der das Ereignis tatsächlich eingetreten ist. Netzwerk- und Verarbeitungsverzögerungen sowie in extremen Fällen werden Protokolle für mobile Apps an den Server gesendet, wenn sie online geschaltet werden. Um dies zu bewältigen, ist es erforderlich, die Pufferverarbeitung und Daten zu handhaben, die sich von der tatsächlichen Ankunftsreihenfolge unterscheiden. Cloud Dataflow scheint in der Lage zu sein, dies gut zu handhaben, indem hier einige neue Konzepte eingeführt werden. (Ich verstehe es nicht vollständig. Weitere Informationen finden Sie im Referenzmaterial https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101, https://www.oreilly.com/ideas / die-Welt-jenseits-Batch-Streaming-102)

Dieses Programmiermodell war ursprünglich für die Ausführung auf GCP vorgesehen, wurde jedoch um Google herum im Mai 2016 als Apache Beam-Projekt zu Open Source (Beam ist übrigens eine Kombination aus Batch und Stream). ..

Vollständig verwaltete Ausführungs-Engine

Google Cloud-Datenfluss, Apache Spark, Apache Flink, Apache Apex usw. können als Ausführungsmodul von Apache Beam ausgewählt werden, das jeden Schritt der Pipeline optimiert und die Verarbeitung effizient verteilt, die später beschrieben wird (mit Ausnahme des Cloud-Datenflusses nicht überprüft). Vor Ort scheint Flink das Beste zu sein.

Um genau zu sein, ist Cloud Dataflow als vollständig verwalteter Dienst positioniert, der Ausführungs-Engine von Apache Beam, die auf GCP ausgeführt wird. Wenn Sie es verschieben, wird die GCE-Instanz hinter den Kulissen gestartet. Angesichts der Wartungskosten der Infrastruktur denke ich, dass dies insbesondere für Startups eine attraktive Option sein kann.

Java or Python

Sie können Java oder Python als Implementierungssprache für Cloud Dataflow auswählen. Ab November 2016 ist die Python-Version jedoch möglicherweise nicht in der Beta verfügbar.

Eine solche. Es wird von nun an erfüllt sein.

Werfen wir einen Blick auf den Code

Werfen wir einen Blick auf den Java-Code. Der Beispielcode befindet sich im github-Repository, das über die Google-Dokumentation verlinkt ist. Es ist organisiert.

Java8-Version, zählen Sie die Wörter in der Eingabe WordCount Nehmen wir als Beispiel .java).

Auszug nur für den Hauptteil (Für CHANGE müssen Sie die Projekt-ID in Ihrem GCP oder den Bucket-Namen von GCS eingeben.)

public static void main(String[] args) {
    DataflowPipelineOptions options = PipelineOptionsFactory.create()
        .as(DataflowPipelineOptions.class);

    options.setRunner(BlockingDataflowPipelineRunner.class);

    // CHANGE 1 of 3: Your project ID is required in order to run your pipeline on the Google Cloud.
    options.setProject("SET_YOUR_PROJECT_ID_HERE");

    // CHANGE 2 of 3: Your Google Cloud Storage path is required for staging local files.
    options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");

    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
     .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
         .withOutputType(new TypeDescriptor<String>() {}))
     .apply(Filter.byPredicate((String word) -> !word.isEmpty()))
     .apply(Count.<String>perElement())
     .apply(MapElements
         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
         .withOutputType(new TypeDescriptor<String>() {}))

     // CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to.
     .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));

    p.run();
}

Pipeline

In der Stimmung des Codes können Sie sehen, dass Dataflow ein funktionaler Stil ist, der den Rahmen der Verarbeitung definiert und nicht die Verarbeitung von Werten. Es ähnelt Rx (Reactive Extension) oder TensorFlow.

Eine Pipeline ist ein Objekt, das den Verarbeitungsfluss steuert und durch Übergeben von Optionen an "Pipeline.create" erstellt wird. Geben Sie optional die Einstellungen an, die zum Ausführen der Pipeline erforderlich sind.

Umschalten zwischen Batch-Modus und Streaming-Modus

options.setRunner(BlockingDataflowPipelineRunner.class);

Teil von

options.setRunner(DataflowPipelineRunner.class);

Zu Mit anderen Worten, beide werden von dem Fluss verarbeitet, der auf dem Pipeline-Objekt basiert.

PCollection, PTransform

Was wir an Pipelines Apply übergeben, ist ein "PTransform" -Objekt. Erstellt mit "FlatMapElements.via" und "Filter.byPredicate" zusammen mit Java8-Lambda-Ausdrücken.

"PTransform" ist eine Schnittstelle, die den Prozess des Empfangens von "PInput" und des Zurückgebens von "POutput" definiert. Für diese "PTransform" erledigt die Pipeline die Arbeit und übergibt die folgenden Werte. Die Verarbeitung des Startpunkts ist die "PInput" -Schnittstelle, und die Verarbeitung des Endpunkts ist die "POutput" -Schnittstelle, aber die Zwischenverarbeitung verarbeitet eine Instanz der "PCollection" -Klasse, die beide implementiert. Die "PCollection" ist das Objekt, das die Daten darstellt, die die Pipeline durchqueren. Andererseits werden die Datenkonvertierungs-, Verzweigungs- und Verbindungsprozesse auf der Pipeline erstellt.

Beispiel:

//Teilen Sie die vorherige Sammlung von Zeichenfolgen in Wörter(Machen Sie das Array am Ende flach)
input.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
         .withOutputType(new TypeDescriptor<String>() {})) ...

//Filtern Sie nur nach nicht leeren Sammlungen von Zeichenfolgen in der vorherigen Zeile
input.apply(Filter.byPredicate((String word) -> !word.isEmpty())) ...

//Aggregieren Sie die Sammlung von Zeichenfolgen in der vorherigen Zeile und konvertieren Sie sie in eine Sammlung von Karten mit Zählwerten nach Wert
input.apply(Count.<String>perElement()) ...

Input-Output

Derzeit unterstützte Ein- und Ausgänge

(Die Python-Version ist eine Textdatei und nur BigQuery)

Eingabe und Ausgabe sind ebenfalls eine Art "PTransform". Übergeben Sie sie daher zur Verarbeitung an "Übernehmen".

Beispiel:

//Lesen Sie eine Textdatei aus Google Cloud Storage und konvertieren Sie sie in eine zeilenweise Sammlung von Zeichenfolgen
pipeline.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) ...

//Konvertieren Sie Stream-Eingaben von Cloud Pubsub in eine Sammlung mit Zeitstempel
pipeline.apply(PubsubIO.Read
          .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
          .subscription(options.getPubsubSubscription())) ...

//Empfängt eine Sammlung und gibt sie an BigQuery aus
//tableRef ist eine Information wie Tabellenposition, Schemainformation der Schematabelle
... .apply(BigQueryIO.Write.to(tableRef)
            .withSchema(schema)

Lass uns rennen

Wenn Sie Ihre Projekt- und Speicherinformationen in den Beispielcode einfügen und ausführen, können Sie den Ausführungsstatus über den Verwaltungsbildschirm von Cloud Dataflow überwachen.

Cloud_Dataflow_-_enoshima_and_Grafana_-_Mesos_Tasks.png

Der in Pipeline definierte Prozess wurde in der Cloud verarbeitet. Hinter den Kulissen wird eine Instanz von Compute Engine gestartet und verarbeitet. Die Ausgabe dieses Beispiels wird ausgegeben, indem die Datei im von Ihnen erstellten Cloud Storage-Bucket in mehrere Teile aufgeteilt wird.

Die Textdatei der Eingabe dieses Beispiels ist übrigens

GLOUCESTER      A poor unfortunate beggar.

EDGAR   As I stood here below, methought his eyes
        Were two full moons; he had a thousand noses,
        Horns whelk'd and waved like the enridged sea:
        It was some fiend; therefore, thou happy father,
        Think that the clearest gods, who make them honours
        Of men's impossibilities, have preserved thee.

GLOUCESTER      I do remember now: henceforth I'll bear
        Affliction till it do cry out itself
        'Enough, enough,' and die. That thing you speak of,
        I took it for a man; often 'twould say
        'The fiend, the fiend:' he led me to that place.

Es ist wie ein Shakespeare-Drehbuch. Die Ausgabezahl pro Wort ist

decreased: 1
'shall': 2
War: 4
empress': 14
Provost: 99
stoops: 6

Es ist eine Datei im Format wie.

Impressionen

Ich habe ein einfaches Beispiel erstellt, um ein Gefühl für das raue Gefühl von Google Cloud Dataflow zu bekommen. Was ich fühlte war

Was ich nicht weiß / Was ich versuchen möchte

Recommended Posts

Was ist Google Cloud Dataflow?
Was ist ein Namespace?
Was ist copy.copy ()
Was ist Django? .. ..
Was ist dotenv?
Was ist POSIX?
Was ist Linux?
Was ist klass?
Was ist Linux?
Was ist Python?
Was ist Hyperopt?
Was ist Linux?
Was ist Pyvenv?
Was ist __call__?
Was ist Linux?
Was ist Python?
Was ist eine Distribution?
Was ist Piotroskis F-Score?
Was ist Raspberry Pi?
Was ist das Calmar-Verhältnis?
Was ist ein Terminal?
[PyTorch Tutorial ①] Was ist PyTorch?
Was ist Hyperparameter-Tuning?
Was ist ein Hacker?
Was ist JSON? .. [Hinweis]
Cloud Dataflow Super Primer
Wofür ist Linux?
Was ist ein Zeiger?
Was ist Ensemble-Lernen?
Was ist TCP / IP?
Was ist Pythons __init__.py?
Was ist ein Iterator?
Was ist UNIT-V Linux?
[Python] Was ist virtualenv?
Was ist maschinelles Lernen?
Was ist Mini Sam oder Mini Max?
Ich habe das in Google Cloud Dataflow vorinstallierte Python-Paket überprüft
Was ist eine logistische Regressionsanalyse?
Was ist die Aktivierungsfunktion?
Was ist eine Instanzvariable?
Was ist ein Entscheidungsbaum?
Was ist ein Kontextwechsel?
[DL] Was ist Gewichtsverlust?
[Python] Python und Sicherheit - is Was ist Python?
Was ist ein Superuser?
Wettbewerbsprogrammierung ist was (Bonus)
[Python] * args ** Was ist kwrgs?
Was ist ein Systemaufruf?
[Definition] Was ist ein Framework?
Was ist die Schnittstelle für ...
Was ist Project Euler 3-Beschleunigung?
Was ist eine Rückruffunktion?
Was ist die Rückruffunktion?
Was ist Ihr "Tanimoto-Koeffizient"?
Python-Grundkurs (1 Was ist Python?)
Apache Beam 2.0.x mit Google Cloud Dataflow beginnend mit IntelliJ und Gradle
[Python] Was ist eine Zip-Funktion?