[PYTHON] Qu'est-ce que Google Cloud Dataflow?

Il s'agit d'un package de WACUL Co., Ltd. et CTO. Tout le monde dans l'entreprise passe un bon moment à faire du pain, à faire du curry et à regarder des films.

Ces derniers temps, j'ai joué avec Google Cloud Dataflow, je l'ai donc organisé comme une note d'introduction. J'espère que cela aidera ceux qui sont sur le point de le toucher en premier à avoir un aperçu approximatif. Je n'ai pas d'expertise approfondie en streaming ou en traitement par lots, alors j'apprécierais que vous me disiez ce que je faisais mal.

Ce que vous pouvez comprendre en lisant cet article

Ce que vous ne comprenez pas après avoir lu cet article

Tout d'abord, des matériaux de référence

Il s'agit d'un matériel de référence lié à Google Cloud Dataflow que j'ai lu jusqu'à présent.

Qu'est-ce que Google Cloud Dataflow?

Cloud Dataflow est un moteur de traitement de données à grande échelle et son service géré. En général, ce serait bien de le considérer comme un compagnon de Hadoop, Spark, etc. Les principales caractéristiques sont la fourniture d'un nouveau modèle de programmation et d'un environnement d'exécution entièrement géré.

Modèle de programmation qui intègre le traitement par lots et le traitement en continu

La différence entre le traitement par lots et le traitement par flux est de savoir si les données traitées sont finies (bornées) ou infinies (illimitées). La plate-forme de traitement par lots ayant une histoire plus longue et étant plus stable que la plate-forme de traitement en continu, le traitement de données à grande échelle a été conçu sur la base du traitement par lots. D'un autre côté, il existe une demande croissante pour des décisions commerciales plus rapides et une livraison plus rapide des données aux utilisateurs, et une demande croissante de moteurs de traitement en continu qui traitent en permanence des données infinies.

Par conséquent, Lambda Architecture, qui combine le traitement par lots et le traitement en continu pour fournir le résultat final, est apparu. Fait. Cependant, maintenir un système construit avec une architecture lambda peut être une tâche ardue. Il est difficile d'imaginer que vous deviez utiliser différents modèles de programmation et être cohérent au niveau logique. .. ..

Le modèle de programmation fourni par Cloud Dataflow semble avoir pour objectif d'intégrer un traitement de données fini et un traitement de données infini basé sur un traitement en continu. En gros, si vous avez un modèle qui traite avec précision les nouvelles données entrantes et un moteur qui peut être fait dans un temps réaliste, si vous lisez toutes les données passées et les saisissez, vous obtiendrez le même résultat. C'est très bien! à propos de ça.

Le problème avec les moteurs de streaming existants était qu'il était difficile de contrôler l'intégrité des données, principalement en raison de la difficulté de gérer la notion de temps. La difficulté de traiter la notion de temps est due au fait qu'en réalité, il existe de nombreux cas où le moment d'exécution du traitement réel est différent du moment où l'événement s'est réellement produit. Retards de réseau et de traitement, et dans les cas extrêmes, les journaux des applications mobiles sont envoyés au serveur lorsqu'ils sont en ligne. Pour faire face à cela, il est nécessaire de gérer le traitement de mise en mémoire tampon et les données qui sont différentes de l'ordre d'arrivée réel. Cloud Dataflow semble être en mesure de bien gérer cela en introduisant ici de nouveaux concepts. (Je ne comprends pas complètement, veuillez donc consulter le matériel de référence https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101, https://www.oreilly.com/ideas / le-monde-au-delà-par lots-streaming-102)

Ce modèle de programmation était à l'origine dédié à l'exécution sur GCP, mais est devenu open source par Google vers mai 2016 en tant que projet Apache Beam (au fait, Beam est une combinaison de Batch et Stream). ..

Moteur d'exécution entièrement géré

Google Cloud Dataflow, Apache Spark, Apache Flink, Apache Apex, etc. peuvent être sélectionnés comme moteur d'exécution d'Apache Beam qui optimise chaque étape du pipeline et distribue efficacement le traitement, qui sera décrit plus tard (non vérifié sauf pour Cloud Dataflow). Sur place, Flink semble être le meilleur.

Donc, pour être précis, Cloud Dataflow se positionne comme un service entièrement géré, le moteur d'exécution d'Apache Beam fonctionnant sur GCP. Lorsque vous le déplacez, l'instance GCE sera lancée dans les coulisses. Compte tenu du coût de maintenance de l'infrastructure, je pense que cela peut être une option intéressante en particulier pour les startups.

Java or Python

Vous pouvez choisir Java ou Python comme langage d'implémentation pour Cloud Dataflow. Cependant, à partir de novembre 2016, la version Python peut ne pas être disponible en version bêta.

Tel. Il sera désormais rempli.

Jetons un œil au code

Jetons un coup d'œil au code Java. L'exemple de code se trouve dans le référentiel github lié à partir de la documentation Google. C'est organisé.

Version Java8, comptage des mots en entrée [WordCount](https://github.com/GoogleCloudPlatform/DataflowJavaSDK-examples/blob/master/src/main/java8/com/google/cloud/dataflow/examples/MinimalWordCountJava8 Prenons .java) comme exemple.

Extrait uniquement pour la partie principale (Pour CHANGE, vous devez saisir l'ID de projet sur votre GCP ou le nom du bucket GCS)

public static void main(String[] args) {
    DataflowPipelineOptions options = PipelineOptionsFactory.create()
        .as(DataflowPipelineOptions.class);

    options.setRunner(BlockingDataflowPipelineRunner.class);

    // CHANGE 1 of 3: Your project ID is required in order to run your pipeline on the Google Cloud.
    options.setProject("SET_YOUR_PROJECT_ID_HERE");

    // CHANGE 2 of 3: Your Google Cloud Storage path is required for staging local files.
    options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");

    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
     .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
         .withOutputType(new TypeDescriptor<String>() {}))
     .apply(Filter.byPredicate((String word) -> !word.isEmpty()))
     .apply(Count.<String>perElement())
     .apply(MapElements
         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
         .withOutputType(new TypeDescriptor<String>() {}))

     // CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to.
     .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));

    p.run();
}

Pipeline

Dans l'ambiance du code, vous pouvez voir que Dataflow est un style fonctionnel qui définit le cadre de traitement plutôt que de traitement des valeurs. C'est similaire à Rx (Reactive Extension) ou TensorFlow.

Un pipeline est un objet qui contrôle le flux de traitement et est créé en passant des options à Pipeline.create. Spécifiez éventuellement les paramètres requis pour exécuter le pipeline.

Basculer entre le mode batch et le mode streaming

options.setRunner(BlockingDataflowPipelineRunner.class);

Partie de

options.setRunner(DataflowPipelineRunner.class);

À En d'autres termes, les deux seront traités par le flux construit sur l'objet Pipeline.

PCollection, PTransform

Ce que nous transmettons à Apply de Pipeline est un objet PTransform. Généré avec FlatMapElements.via et Filter.byPredicate avec les expressions lambda java8.

«PTransform» est une interface qui définit le processus de réception de «PInput» et de retour de «POutput». Pour cette PTransform, le pipeline fera le travail en transmettant les valeurs suivantes. Le traitement du point de départ est l'interface «PInput», et le traitement du point final est l'interface «POutput», mais le traitement intermédiaire gère une instance de la classe «PCollection» qui implémente les deux. La «PCollection» est l'objet qui représente les données qui traversent le pipeline, et d'autre part, la conversion, le branchement et le traitement de jointure des données sont construits sur le pipeline.

Exemple:

//Divisez la collection précédente de chaînes en mots(Rendre le tableau plat à la fin)
input.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
         .withOutputType(new TypeDescriptor<String>() {})) ...

//Filtrer uniquement pour les collections non vides de chaînes de la ligne précédente
input.apply(Filter.byPredicate((String word) -> !word.isEmpty())) ...

//Agréger la collection de chaînes de la ligne précédente et la convertir en une collection de cartes de décomptes par valeur
input.apply(Count.<String>perElement()) ...

Entrée sortie

Entrées et sorties actuellement prises en charge

(La version python est un fichier texte et BigQuery uniquement)

L'entrée et la sortie sont également un type de PTransform, alors passez-le à ʻapply` pour le traitement.

Exemple:

//Lisez un fichier texte depuis Google Cloud Storage et convertissez-le en une collection ligne par ligne de chaînes
pipeline.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) ...

//Convertir l'entrée de flux de Cloud Pubsub en une collection horodatée
pipeline.apply(PubsubIO.Read
          .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
          .subscription(options.getPubsubSubscription())) ...

//Reçoit une collection et la transmet à BigQuery
//tableRef est des informations telles que l'emplacement de la table, les informations de schéma de la table de schéma
... .apply(BigQueryIO.Write.to(tableRef)
            .withSchema(schema)

Courons

Si vous placez vos informations de projet et de stockage dans l'exemple de code et que vous l'exécutez, vous pouvez surveiller l'état d'exécution à partir de l'écran de gestion de Cloud Dataflow.

Cloud_Dataflow_-_enoshima_and_Grafana_-_Mesos_Tasks.png

Le processus défini dans Pipeline a été traité dans le cloud. Dans les coulisses, une instance de Compute Engine est lancée et traitée. La sortie de cet exemple est générée en divisant le fichier en plusieurs parties sur le bucket Cloud Storage que vous avez créé.

À propos, le fichier texte de l'entrée de cet exemple est

GLOUCESTER      A poor unfortunate beggar.

EDGAR   As I stood here below, methought his eyes
        Were two full moons; he had a thousand noses,
        Horns whelk'd and waved like the enridged sea:
        It was some fiend; therefore, thou happy father,
        Think that the clearest gods, who make them honours
        Of men's impossibilities, have preserved thee.

GLOUCESTER      I do remember now: henceforth I'll bear
        Affliction till it do cry out itself
        'Enough, enough,' and die. That thing you speak of,
        I took it for a man; often 'twould say
        'The fiend, the fiend:' he led me to that place.

C'est comme un scénario de Shakespeare. Le nombre de sorties par mot est

decreased: 1
'shall': 2
War: 4
empress': 14
Provost: 99
stoops: 6

C'est un fichier au format comme.

Impressions

J'ai couru un exemple simple pour avoir une idée de la sensation approximative de Google Cloud Dataflow. Ce que j'ai ressenti était

Ce que je ne sais pas / ce que je veux essayer

Recommended Posts

Qu'est-ce que Google Cloud Dataflow?
Qu'est-ce que l'espace de noms
Qu'est-ce que copy.copy ()
Qu'est-ce que Django? .. ..
Qu'est-ce que dotenv?
Qu'est-ce que POSIX
Qu'est-ce que Linux
Qu'est-ce que le klass?
Qu'est-ce que Linux?
Qu'est-ce que python
Qu'est-ce que l'hyperopt?
Qu'est-ce que Linux
Qu'est-ce que pyvenv
Qu'est-ce que __call__
Qu'est-ce que Linux
Qu'est-ce que Python
Qu'est-ce qu'une distribution?
Qu'est-ce que le F-Score de Piotroski?
Qu'est-ce que Raspberry Pi?
Qu'est-ce que Calmar Ratio?
Qu'est-ce qu'un terminal?
[Tutoriel PyTorch ①] Qu'est-ce que PyTorch?
Qu'est-ce que le réglage des hyper paramètres?
Qu'est-ce qu'un hacker?
Qu'est-ce que JSON? .. [Remarque]
Cloud Dataflow Super Primer
À quoi sert Linux?
Qu'est-ce qu'un pointeur?
Qu'est-ce que l'apprentissage d'ensemble?
Qu'est-ce que TCP / IP?
Qu'est-ce que __init__.py de Python?
Qu'est-ce qu'un itérateur?
Qu'est-ce que UNIT-V Linux?
[Python] Qu'est-ce que virtualenv
Qu'est-ce que l'apprentissage automatique?
Qu'est-ce que Mini Sam ou Mini Max?
J'ai vérifié le package Python pré-installé dans Google Cloud Dataflow
Qu'est-ce que l'analyse de régression logistique?
Quelle est la fonction d'activation?
Qu'est-ce qu'une variable d'instance?
Qu'est-ce qu'un arbre de décision?
Qu'est-ce qu'un changement de contexte?
[DL] Qu'est-ce que la décroissance du poids?
[Python] Python et sécurité-① Qu'est-ce que Python?
Qu'est-ce qu'un super utilisateur?
La programmation du concours, c'est quoi (bonus)
[Python] * args ** Qu'est-ce que kwrgs?
Qu'est-ce qu'un appel système
[Définition] Qu'est-ce qu'un cadre?
A quoi sert l'interface ...
Qu'est-ce que Project Euler 3 Acceleration?
Qu'est-ce qu'une fonction de rappel?
Qu'est-ce que la fonction de rappel?
Quel est votre "coefficient de Tanimoto"?
Cours de base Python (1 Qu'est-ce que Python)
Apache Beam 2.0.x avec Google Cloud Dataflow commençant par IntelliJ et Gradle
[Python] Qu'est-ce qu'une fonction zip?