Es ist ein Paket von WACUL Co., Ltd. und CTO. Alle in der Firma haben Spaß daran, Brot zu backen, Curry zu machen und Filme zu schauen.
Ich habe in letzter Zeit mit Google Cloud Dataflow herumgespielt und es daher als Einführung organisiert. Ich hoffe, es wird denjenigen helfen, die es zuerst berühren wollen, um einen groben Überblick zu bekommen. Ich habe keine fundierten Kenntnisse in Streaming oder Stapelverarbeitung, daher würde ich mich freuen, wenn Sie mir sagen könnten, was ich falsch gemacht habe.
Dies ist ein Referenzmaterial zu Google Cloud Dataflow, das ich bisher gelesen habe.
Lass uns trotzdem lesen
Ein von Google verfasster Kommentarartikel zur Migration von der vorhandenen Stapelverarbeitung zur Streaming-Verarbeitung. 101 ist eine Geschichte über grundlegende Konzepte, und Teil 102 ist eine Geschichte, die etwas weiter in den Datenfluss geht.
[Übersetzter Artikel] von kimutansk (http://qiita.com/kimutansk/items/447df5795768a483caa8) ist in Qiita, es ist zu schön.
Warum Apache Beam: Gründe, um Dataflow zum Eintritt in den Rivalen zu ermutigen
Ein Artikel, der den Hintergrund der Eröffnung des Dataflow-Programmiermodells durch Google erläutert. Die endgültige "Streaming- und Batch-Zukunft in Apache Beam" hat einen langen Weg hinter sich.
Cloud Dataflow ist eine umfangreiche Datenverarbeitungs-Engine und deren verwalteter Dienst. Im Allgemeinen wäre es schön, sich das als Begleiter von Hadoop, Spark usw. vorzustellen. Die Hauptmerkmale sind die Bereitstellung eines neuen Programmiermodells und einer vollständig verwalteten Ausführungsumgebung.
Der Unterschied zwischen Stapelverarbeitung und Streaming-Verarbeitung besteht darin, ob die verarbeiteten Daten endlich (begrenzt) oder unendlich (unbegrenzt) sind. Da die Stapelverarbeitungsplattform eine längere Geschichte hat und stabiler als die Streaming-Verarbeitungsplattform ist, wurde die Datenverarbeitung in großem Maßstab auf der Grundlage der Stapelverarbeitung entwickelt. Andererseits besteht eine wachsende Nachfrage nach schnelleren Geschäftsentscheidungen und einer schnelleren Bereitstellung von Daten für Benutzer sowie eine wachsende Nachfrage nach Streaming-Verarbeitungs-Engines, die kontinuierlich endlose Daten verarbeiten.
Daher wurde Lambda Architecture veröffentlicht, das Stapelverarbeitung und Streaming-Verarbeitung kombiniert, um das Endergebnis zu erzielen. Hat. Die Wartung eines Systems mit einer Lambda-Architektur kann jedoch eine entmutigende Aufgabe sein. Es ist schwer vorstellbar, dass Sie unterschiedliche Programmiermodelle verwenden und auf logischer Ebene konsistent sein müssen. .. ..
Das von Cloud Dataflow bereitgestellte Programmiermodell scheint darauf abzuzielen, die endliche Datenverarbeitung und die unendliche Datenverarbeitung basierend auf der Streaming-Verarbeitung zu integrieren. Um es grob auszudrücken: Wenn Sie ein Modell haben, das neue eingehende Daten genau verarbeitet, und eine Engine, die in einer realistischen Zeit ausgeführt werden kann, erhalten Sie das gleiche Ergebnis, wenn Sie alle vergangenen Daten wiedergeben und eingeben. Das ist gut! darüber.
Das Problem bei vorhandenen Streaming-Engines bestand darin, dass es schwierig war, die Integrität der Daten zu kontrollieren, hauptsächlich aufgrund der Schwierigkeit, mit dem Zeitkonzept umzugehen. Die Schwierigkeit, mit dem Zeitbegriff umzugehen, beruht auf der Tatsache, dass es in der Realität viele Fälle gibt, in denen sich die Ausführungszeit bei der tatsächlichen Verarbeitung von der Zeit unterscheidet, zu der das Ereignis tatsächlich eingetreten ist. Netzwerk- und Verarbeitungsverzögerungen sowie in extremen Fällen werden Protokolle für mobile Apps an den Server gesendet, wenn sie online geschaltet werden. Um dies zu bewältigen, ist es erforderlich, die Pufferverarbeitung und Daten zu handhaben, die sich von der tatsächlichen Ankunftsreihenfolge unterscheiden. Cloud Dataflow scheint in der Lage zu sein, dies gut zu handhaben, indem hier einige neue Konzepte eingeführt werden. (Ich verstehe es nicht vollständig. Weitere Informationen finden Sie im Referenzmaterial https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101, https://www.oreilly.com/ideas / die-Welt-jenseits-Batch-Streaming-102)
Dieses Programmiermodell war ursprünglich für die Ausführung auf GCP vorgesehen, wurde jedoch um Google herum im Mai 2016 als Apache Beam-Projekt zu Open Source (Beam ist übrigens eine Kombination aus Batch und Stream). ..
Google Cloud-Datenfluss, Apache Spark, Apache Flink, Apache Apex usw. können als Ausführungsmodul von Apache Beam ausgewählt werden, das jeden Schritt der Pipeline optimiert und die Verarbeitung effizient verteilt, die später beschrieben wird (mit Ausnahme des Cloud-Datenflusses nicht überprüft). Vor Ort scheint Flink das Beste zu sein.
Um genau zu sein, ist Cloud Dataflow als vollständig verwalteter Dienst positioniert, der Ausführungs-Engine von Apache Beam, die auf GCP ausgeführt wird. Wenn Sie es verschieben, wird die GCE-Instanz hinter den Kulissen gestartet. Angesichts der Wartungskosten der Infrastruktur denke ich, dass dies insbesondere für Startups eine attraktive Option sein kann.
Java or Python
Sie können Java oder Python als Implementierungssprache für Cloud Dataflow auswählen. Ab November 2016 ist die Python-Version jedoch möglicherweise nicht in der Beta verfügbar.
Eine solche. Es wird von nun an erfüllt sein.
Werfen wir einen Blick auf den Java-Code. Der Beispielcode befindet sich im github-Repository, das über die Google-Dokumentation verlinkt ist. Es ist organisiert.
Java8-Version, zählen Sie die Wörter in der Eingabe WordCount Nehmen wir als Beispiel .java).
Auszug nur für den Hauptteil (Für CHANGE müssen Sie die Projekt-ID in Ihrem GCP oder den Bucket-Namen von GCS eingeben.)
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
// CHANGE 1 of 3: Your project ID is required in order to run your pipeline on the Google Cloud.
options.setProject("SET_YOUR_PROJECT_ID_HERE");
// CHANGE 2 of 3: Your Google Cloud Storage path is required for staging local files.
options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
.withOutputType(new TypeDescriptor<String>() {}))
.apply(Filter.byPredicate((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
.withOutputType(new TypeDescriptor<String>() {}))
// CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to.
.apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
p.run();
}
Pipeline
In der Stimmung des Codes können Sie sehen, dass Dataflow ein funktionaler Stil ist, der den Rahmen der Verarbeitung definiert und nicht die Verarbeitung von Werten. Es ähnelt Rx (Reactive Extension) oder TensorFlow.
Eine Pipeline ist ein Objekt, das den Verarbeitungsfluss steuert und durch Übergeben von Optionen an "Pipeline.create" erstellt wird. Geben Sie optional die Einstellungen an, die zum Ausführen der Pipeline erforderlich sind.
Umschalten zwischen Batch-Modus und Streaming-Modus
options.setRunner(BlockingDataflowPipelineRunner.class);
Teil von
options.setRunner(DataflowPipelineRunner.class);
Zu Mit anderen Worten, beide werden von dem Fluss verarbeitet, der auf dem Pipeline-Objekt basiert.
PCollection, PTransform
Was wir an Pipelines Apply übergeben, ist ein "PTransform" -Objekt. Erstellt mit "FlatMapElements.via" und "Filter.byPredicate" zusammen mit Java8-Lambda-Ausdrücken.
"PTransform" ist eine Schnittstelle, die den Prozess des Empfangens von "PInput" und des Zurückgebens von "POutput" definiert. Für diese "PTransform" erledigt die Pipeline die Arbeit und übergibt die folgenden Werte. Die Verarbeitung des Startpunkts ist die "PInput" -Schnittstelle, und die Verarbeitung des Endpunkts ist die "POutput" -Schnittstelle, aber die Zwischenverarbeitung verarbeitet eine Instanz der "PCollection" -Klasse, die beide implementiert. Die "PCollection" ist das Objekt, das die Daten darstellt, die die Pipeline durchqueren. Andererseits werden die Datenkonvertierungs-, Verzweigungs- und Verbindungsprozesse auf der Pipeline erstellt.
Beispiel:
//Teilen Sie die vorherige Sammlung von Zeichenfolgen in Wörter(Machen Sie das Array am Ende flach)
input.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
.withOutputType(new TypeDescriptor<String>() {})) ...
//Filtern Sie nur nach nicht leeren Sammlungen von Zeichenfolgen in der vorherigen Zeile
input.apply(Filter.byPredicate((String word) -> !word.isEmpty())) ...
//Aggregieren Sie die Sammlung von Zeichenfolgen in der vorherigen Zeile und konvertieren Sie sie in eine Sammlung von Karten mit Zählwerten nach Wert
input.apply(Count.<String>perElement()) ...
Derzeit unterstützte Ein- und Ausgänge
(Die Python-Version ist eine Textdatei und nur BigQuery)
Eingabe und Ausgabe sind ebenfalls eine Art "PTransform". Übergeben Sie sie daher zur Verarbeitung an "Übernehmen".
Beispiel:
//Lesen Sie eine Textdatei aus Google Cloud Storage und konvertieren Sie sie in eine zeilenweise Sammlung von Zeichenfolgen
pipeline.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) ...
//Konvertieren Sie Stream-Eingaben von Cloud Pubsub in eine Sammlung mit Zeitstempel
pipeline.apply(PubsubIO.Read
.timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
.subscription(options.getPubsubSubscription())) ...
//Empfängt eine Sammlung und gibt sie an BigQuery aus
//tableRef ist eine Information wie Tabellenposition, Schemainformation der Schematabelle
... .apply(BigQueryIO.Write.to(tableRef)
.withSchema(schema)
Wenn Sie Ihre Projekt- und Speicherinformationen in den Beispielcode einfügen und ausführen, können Sie den Ausführungsstatus über den Verwaltungsbildschirm von Cloud Dataflow überwachen.
Der in Pipeline definierte Prozess wurde in der Cloud verarbeitet. Hinter den Kulissen wird eine Instanz von Compute Engine gestartet und verarbeitet. Die Ausgabe dieses Beispiels wird ausgegeben, indem die Datei im von Ihnen erstellten Cloud Storage-Bucket in mehrere Teile aufgeteilt wird.
Die Textdatei der Eingabe dieses Beispiels ist übrigens
GLOUCESTER A poor unfortunate beggar.
EDGAR As I stood here below, methought his eyes
Were two full moons; he had a thousand noses,
Horns whelk'd and waved like the enridged sea:
It was some fiend; therefore, thou happy father,
Think that the clearest gods, who make them honours
Of men's impossibilities, have preserved thee.
GLOUCESTER I do remember now: henceforth I'll bear
Affliction till it do cry out itself
'Enough, enough,' and die. That thing you speak of,
I took it for a man; often 'twould say
'The fiend, the fiend:' he led me to that place.
Es ist wie ein Shakespeare-Drehbuch. Die Ausgabezahl pro Wort ist
decreased: 1
'shall': 2
War: 4
empress': 14
Provost: 99
stoops: 6
Es ist eine Datei im Format wie.
Ich habe ein einfaches Beispiel erstellt, um ein Gefühl für das raue Gefühl von Google Cloud Dataflow zu bekommen. Was ich fühlte war
Recommended Posts