Cet article présente l'historique de l '** API Apache Flink Python ** et décrit son architecture, son environnement de développement et ses principaux opérateurs.
Apache Flink est un moteur informatique Big Data open source avec des capacités de traitement unifié de flux et de données par lots. Apache Flink 1.9.0 fournit une API d'apprentissage automatique (ML) et une nouvelle API Python. Voyons maintenant de plus près pourquoi Apache Flink prend en charge Python.
--Python est l'un des langages de développement les plus populaires
Selon les statistiques de RedMonk, Python est le troisième langage de développement le plus populaire après Java et JavaScript. RedMonk est une société d'analystes de l'industrie axée sur les développeurs de logiciels. Apache Flink est un moteur de calcul Big Data avec des capacités de traitement de données par flux et par lots. Quelle est la relation entre le sujet Python et Apache Flink? Avec cette question à l'esprit, jetons un coup d'œil aux désormais célèbres composants open source liés au big data. Par exemple, le premier cadre de traitement par lots Hadoop, la plate-forme de calcul de flux STORM, le Spark récemment populaire, l'entrepôt de données Hive et le HBase basé sur le stockage KV sont bien connus pour prendre en charge l'API Python. C'est un projet open source.
--Python est pris en charge par de nombreux projets open source.
Compte tenu de l'écosystème complet de Python, Apache Flink a investi massivement dans la version 1.9 pour lancer un tout nouveau PyFlink. En tant que big data, l'intelligence artificielle (IA) est étroitement liée à Python.
--Python est soutenu par l'apprentissage automatique (ML).
Selon les statistiques, il correspond à 0,129% des informations sur les emplois dans l'industrie du ML, faisant de Python le langage le plus recherché. Par rapport à 0,076% du langage R, nous pouvons voir que Python est préféré dans l'industrie du ML. Python, un langage interprétatif, a pour philosophie de conception «il n'y a qu'une seule façon de faire les choses». En raison de sa simplicité et de sa facilité d'utilisation, Python, l'un des langages les plus populaires au monde, est devenu un bon écosystème dans le domaine du Big Data Computing. Il a également un potentiel prometteur dans le domaine du ML. Donc, l'autre jour, nous avons annoncé l'API Python avec Apache Flink 1.9, qui a adopté une architecture complètement nouvelle.
Apache Flink est un moteur informatique doté de capacités de traitement de données par flux et par lots unifiées. La communauté attache une grande importance aux utilisateurs de Flink et souhaite fournir plus d'accès et de canaux à Flink comme Java et Scala. Cela rendra Flink plus pratique pour plus d'utilisateurs et bénéficiera de la valeur fournie par les capacités de calcul Big Data de Flink. À partir d'Apache Flink 1.9, la communauté Apache Flink lance une API Python avec une toute nouvelle architecture technique qui prend en charge les opérateurs les plus couramment utilisés tels que JOIN, AGG et WINDOW.
Python API - RoadMap
Dans Apache Flink 1.9, Python peut tirer parti des fonctions Java définies par l'utilisateur, mais il ne prend pas en charge la définition de fonctions définies par l'utilisateur natives Python. Par conséquent, Apache Flink 1.10 prend en charge les fonctions définies par l'utilisateur Python et la bibliothèque d'analyse de données Python Pandas. Apache Flink 1.11 ajoute également la prise en charge des API DataStream et ML.
La nouvelle architecture d'API Python se compose d'un module API utilisateur, d'un module de communication entre une machine virtuelle Python (VM) et une VM Java, et d'un module qui soumet des tâches au cluster Flink pour opération.
Comment les machines virtuelles Python et les machines virtuelles Java communiquent-elles? La machine virtuelle Python dispose d'une passerelle Python qui maintient une connexion avec la machine virtuelle Java qui dispose d'un GateWayServer qui reçoit des appels de la machine virtuelle Python.
Les versions Apache Flink antérieures à 1.9 prennent déjà en charge l'API Python dans les modules DataSet et DataStream. Cependant, ils utilisent chacun deux API différentes. API DataSet et API DataStream. Une architecture unifiée est cruciale pour un moteur de calcul de flux comme Flink, qui dispose de capacités de traitement de données par flux et par lots unifiées. Les API Python DataSet et DataStream existantes utilisent l'architecture technique de JPython. Cependant, JPython ne peut pas correctement prendre en charge la série Python 3.X. En conséquence, l'architecture API Python existante a été abandonnée et Flink 1.9 a adopté une architecture technologique complètement nouvelle. Cette nouvelle API Python est basée sur l'API Table.
La communication entre l'API Table et l'API Python est implémentée dans la communication entre la machine virtuelle Python et la machine virtuelle Java. L'API Python communique avec l'API Java, qui écrit et appelle. L'utilisation de l'API Python est similaire à l'utilisation de l'API Table de Java. La nouvelle architecture présente les avantages suivants:
Lorsqu'une machine virtuelle Python lance une demande pour un objet Java, la machine virtuelle Java crée l'objet, le stocke dans une structure de stockage et attribue un ID à l'objet. Il envoie ensuite cet ID à la machine virtuelle Python, qui manipule l'objet avec l'ID d'objet correspondant. Étant donné que la machine virtuelle Python peut manipuler tous les objets de la machine virtuelle Java, elle garantit que l'API de table Python a les mêmes fonctionnalités que l'API de table Java et peut tirer parti des modèles d'optimisation des performances existants.
Dans la nouvelle architecture et le nouveau modèle de communication, la machine virtuelle Python obtient l'ID d'objet Java correspondant et appelle l'API de table Java simplement en transmettant le nom et les paramètres de la méthode d'appel à la machine virtuelle Java. Par conséquent, le développement de l'API de table Python suit les mêmes étapes que le développement de l'API de table Java. Ensuite, explorons comment développer un travail d'API Python simple.
En règle générale, les travaux de table Python sont divisés en quatre étapes. Compte tenu de la situation actuelle, décidez d'abord si vous souhaitez exécuter le travail en mode batch ou en mode streaming. Les versions ultérieures des utilisateurs peuvent ignorer cette étape, mais les utilisateurs d'Apache Flink 1.9 doivent prendre cette décision.
Une fois que vous avez choisi un mode d'exécution de travail, vous savez d'où proviennent les données et comment vous définissez la source de données, le schéma et le type de données. Ensuite, écrivez la logique de calcul (l'opération de calcul effectuée sur les données) et conservez le résultat final du calcul dans le système spécifié. Ensuite, définissez l'évier. Définissez le schéma du récepteur et tous les types de champs qu'il contient, comme vous définiriez une source de données.
Ensuite, comprenons comment coder chacune des étapes ci-dessus à l'aide de l'API Python. Tout d'abord, créez un environnement d'exécution, qui devrait finalement être un environnement de table. Cet environnement de table doit avoir un module de configuration de table avec certains paramètres de configuration passés à la couche RunTime pendant le processus d'exécution. Ce module doit également fournir des paramètres personnalisés qui peuvent être utilisés pendant la phase de développement de service réelle.
Après avoir créé l'environnement d'exécution, vous devez définir la table de la source de données. À titre d'exemple, les enregistrements de données du fichier CSV sont séparés par des virgules (,) et les champs sont répertoriés dans la colonne de champ. Cette table ne contient qu'un seul champ, Word, qui est de type String.
Après avoir défini et décrit la source de données et converti la structure de la source de données en une table, quel type de structure de données et de type de données seront dans la couche API de table? Ensuite, voyons comment ajouter des champs et des types de champs en utilisant with_SCHEMA
. Ici, il n'y a qu'un seul champ et le type de données est String. La source de données est enregistrée en tant que table dans le catalogue pour les requêtes et calculs ultérieurs.
Créez ensuite une table de résultats. Une fois le calcul terminé, enregistrez le résultat du calcul dans un système persistant. Par exemple, pour écrire un travail WordCount, vous disposez d'abord d'une table de stockage avec deux champs: mot et nombre. Enregistrez ensuite cette table comme évier.
Après avoir enregistré le puits de table, voyons comment écrire la logique de calcul. En fait, écrire WordCount dans l'API Python est aussi simple que d'écrire dans l'API Table. Contrairement à DataStream, l'API Python ne nécessite qu'une seule ligne d'instruction pour écrire un travail WordCount. Par exemple, parcourez d'abord la table source et utilisez l'instruction GROUP BY pour regrouper les lignes par mot. Utilisez ensuite l'instruction SELECT pour sélectionner des mots et utilisez la fonction d'agrégation pour calculer le nombre de chaque mot. Enfin, insérez le résultat du calcul dans le tableau des résultats.
La question fatale est de savoir comment exécuter un travail WordCount. Commencez par configurer l'environnement de développement. Différentes versions du logiciel peuvent être installées sur différentes machines. Voici quelques-unes des conditions requises pour une version logicielle.
Deuxièmement, créez un package de version binaire Java basé sur le code source. Par conséquent, clonez le code dans la branche master pour obtenir la branche 1.9. Bien sûr, vous pouvez utiliser le code maître. Cependant, le code maître n'est pas stable, nous vous recommandons donc d'utiliser le code de branche 1.9. Continuons avec la procédure. Commencez par compiler le code. Par exemple:
//Télécharger Gendai
git clone https://github.com/apache/flink.git
//Enlèvement 1.9 minutes
cd flink; git fetch origin release-1.9
git checkout -b release-1.9 origin/release-1.9
//Système structuré à deux progrès
mvn clean install -DskipTests -Dfast
Après la compilation, placez le package de version dans le répertoire correspondant.
cd flink-dist/target/flink-1.9.0-bin/flink-1.9.0
tar -zcvf flink-1.9.0.tar.gz flink-1.9.0
Après avoir créé l'API Java, validez l'API et créez le package de version Python.
Tous les utilisateurs de Python savent que pour installer des packages via pip install, ils doivent soit intégrer leurs bibliothèques dépendantes à leur environnement Python local, soit installer ces bibliothèques dépendantes dans leur environnement local.
Cela s'applique également à Flink. Empaquetez et installez PyFlink dans un package de ressources reconnu par Pypip. Utilisez la commande suivante pour copier le package et l'installer dans votre environnement.
cd flink-Python;Python setup.py sdist
Ce processus encapsule simplement le package de version Java avec certains packages Java et certains packages Python du module PyFlink. Recherchez le nouveau paquet ʻapache-link-1.9.dev0.tar.gz` dans le répertoire dist.
cd dist/
Le fichier ʻapache-flink-1.9.dev0.tar.gz` dans le répertoire dist est un paquet PyFlink que vous pouvez installer avec pip install. Le package d'installation d'Apache Flink 1.9 comprend à la fois Flink Table et Flink Table Blink. Flink prend en charge deux planificateurs en même temps. Vous pouvez basculer librement entre le planificateur Flink par défaut et le planificateur Blink. Nous vous encourageons à essayer chacun d'entre eux. Après l'emballage, essayez de l'installer dans notre environnement.
Utilisez une commande très simple pour vérifier d'abord si la commande est correcte. Avant d'exécuter la commande, utilisez pip pour vérifier la liste pour voir si le package est déjà installé. Essayez ensuite d'installer le package que vous avez préparé à l'étape précédente. Dans un scénario réel, vous installeriez un nouveau package pour installer la mise à niveau.
pip install dist/*.tar.gz
pip list|grep flink
Après avoir installé le package, utilisez le travail WordCount que j'ai écrit plus tôt pour vérifier si l'environnement est correct. Pour vérifier si votre environnement est correct, clonez directement le référentiel de code d'environnement en exécutant la commande suivante:
git clone https://github.com/sunjincheng121/enjoyment.code.git
cd enjoyment.code; Python word_count.py
Ensuite, essayons-le. Recherchez le fichier de travail wordCount créé précédemment dans ce répertoire. Utilisons directement python word_count.py
pour vérifier s'il y a un problème dans l'environnement. L'API Apache Flink Python doit lancer un mini-cluster pour exécuter les travaux WordCount. À présent, le travail est déjà en cours d'exécution sur le mini-cluster.
Dans ce processus, le code lit d'abord le fichier source et écrit le résultat dans un fichier CSV. Dans ce répertoire, recherchez le fichier sink.csv
. Pour obtenir des instructions étape par étape, consultez la vidéo intitulée «Le statu quo et la planification de l'API Apache Flink Python» publiée sur Apache Flink Community China.
Parlons maintenant de la mise en place de l'environnement de développement intégré (IDE). Nous vous recommandons d'utiliser PyCharm pour développer la logique et les tâches liées à Python.
Pour plus d'informations sur la configuration de l'EDI, scannez le code QR ou visitez le blog (https://enjoyment.cool) directement. S'il te plait donne moi. Je pense qu'il existe de nombreux environnements Python, mais vous devez choisir celui que vous avez utilisé pour votre installation pip. C'est très important. Pour obtenir des instructions détaillées, consultez la vidéo intitulée «État actuel et plans de l'API Apache Flink Python».
Quels sont les moyens de soumettre un travail? Tout d'abord, utilisez la méthode CLI pour soumettre un travail à un cluster existant. Vous devez démarrer le cluster pour utiliser cette méthode. Le répertoire de construction est généralement sous build-target. Exécutez cette commande directement pour démarrer le cluster. Notez que ce processus utilise un port Web externe. Définissez le numéro de port dans le fichier flink-conf.yaml
. Démarrez ensuite le cluster à l'aide des commandes du PPT. Pour vérifier que le cluster a démarré avec succès, consultez les journaux ou visitez le site avec un navigateur. Si le cluster démarre correctement, voyons comment soumettre un travail.
Utilisez Flink run pour exécuter le code suivant afin de soumettre un travail.
./bin/flink run -py ~/training/0806/enjoyment.code/myPyFlink/enjoyment/word_count_cli.py
Utilisez py pour spécifier un fichier Python, pym pour spécifier un module Python, pyfs pour spécifier un fichier de ressources Python et j pour spécifier un package JAR.
Avec Apache Flink 1.9, il existe un moyen plus pratique. Le shell Python vous permet d'écrire de manière interactive les résultats obtenus avec l'API Python. Le shell Python fonctionne dans deux modes, local et distant, mais il n'y a pas de grande différence. Tout d'abord, essayez le mode local en exécutant la commande suivante.
bin/pyflink-shell.sh local
Cette commande démarre un mini cluster. L'exécution du code renvoie un logo Flink avec le texte FLINK --PYTHON --SHELL et quelques exemples de scripts démontrant cette fonctionnalité. La saisie de ces scripts renverra la sortie et les résultats corrects. Ici, vous pouvez écrire soit en streaming, soit en batch. Regardez la vidéo pour des instructions d'utilisation détaillées.
Vous avez maintenant une compréhension de base de l'architecture de l'API Apache Flink 1.9 Python Table et comment configurer l'API Python Table. Pour voir comment exécuter un travail dans l'EDI et soumettre un travail à l'aide de Flink run et Python Shell, j'ai pris en compte un exemple simple de WordCount. J'ai également expérimenté des façons interactives de tirer parti de l'API Python de Flink. Après avoir présenté les préférences Flink et un simple exemple de démonstration, nous discuterons des opérateurs clés dans Apache Flink 1.9.
Nous avons déjà vu comment créer un emploi. Tout d'abord, sélectionnez le mode d'exécution: sélectionnez streaming ou batch. Ensuite, définissez les tables (tables source et résultat), le schéma et les types de données à utiliser. Ensuite, écrivez la logique de calcul. Enfin, nous utilisons les fonctions d'agrégation intégrées de l'API Python Count, Sum, Max, Min. Par exemple, lorsque j'ai écrit un travail WordCount, j'ai utilisé la fonction Count.
Apache Flink 1.9 répond à la plupart de vos besoins habituels. Maintenant, en dehors de ce que nous avons vu jusqu'à présent, jetons un coup d'œil aux opérateurs de l'API Flink Table pris en charge par Apache Flink 1.9. Les opérateurs de l'API Flink Table (opérateur d'API de table Python et opérateur d'API de table Java) prennent en charge des opérations telles que:
Premièrement, les opérations à flux unique telles que SELECT, FILTER, les opérations d'agrégation, les opérations de fenêtre et les opérations de colonne (ʻadd_columns, drop_columns`).
Deuxièmement, les opérations à double flux telles que JOIN, MINUS et UNION.
Tous ces opérateurs sont pris en charge par l'API de table Python. Dans Apache Flink 1.9, l'API Python Table est fonctionnellement similaire à l'API Java Table. Ensuite, comprenons comment écrire les opérateurs ci-dessus et comment développer des opérateurs Python.
Comme vous l'avez peut-être remarqué dans cet article, nous n'avons pas abordé les séries chronologiques qui sont des attributs du flux de données. L'état objectif du flux de données est qu'il peut être en panne. Apache Flink utilise le mécanisme de Watermark pour traiter les flux de données dans le désordre.
Supposons que vous disposiez d'un fichier de données au format JSON contenant deux champs, a et DateTime. Pour définir un filigrane, vous devez ajouter une colonne rowtime lors de la création du schéma et définir le type de données rowtime sur Timestamp.
Définissez les filigranes de différentes manières. Utilisez watermarks_periodic_bounded
pour envoyer régulièrement des filigranes. Le nombre 60000 fait référence à 60000 ms, ce qui équivaut à 60 secondes ou 1 minute. Cette définition permet au programme de traiter des flux de données dans le désordre dans une période d'une minute. Par conséquent, plus la valeur est élevée, plus les données dans le désordre sont tolérantes et plus la latence est longue. Pour plus d'informations sur le fonctionnement du filigrane, consultez ce blog http://1t.click/7dM.
Enfin, je présenterai l'application des fonctions définies par l'utilisateur Java (UDF) dans Apache Flink 1.9. Apache Flink 1.9 ne prend pas en charge les UDF Python, mais vous pouvez tirer parti des UDF Java en Python. Apache Flink 1.9 optimise et reconstruit le module Table. Pour développer un UDF Java, importez une dépendance simple et développez une API Python. Importez Flink-table-common.
Voyons ensuite comment développer une API Python à l'aide de l'UDF de Java. Supposons que vous ayez besoin de développer un UDF qui calcule la longueur d'une chaîne. Vous devez enregistrer la fonction Java avec Python en utilisant t_env.register_java_function
, en passant le nom et le chemin complet de la fonction Java. Vous pouvez ensuite appeler l'UDF en utilisant le nom enregistré. Pour plus d'informations, consultez mon blog http://1t.click/HQF
Comment exécuter Java UDF? Exécutez-le en utilisant la commande d'exécution de Flink. Comme mentionné précédemment, nous utilisons -j pour inclure le package JAR UDF.
Java UDF prend-il uniquement en charge les fonctions scalaires? Java UDF prend en charge non seulement les fonctions scalaires, mais également les fonctions de table et les fonctions d'agrégation.
Voici quelques documents et liens couramment utilisés vers mon blog. J'espère qu'ils vous aideront.
Cet article a présenté l'historique et la feuille de route de développement de l'API Apache Flink Python. Ensuite, j'ai expliqué pourquoi nous avons changé l'architecture de l'API Apache Flink Python et les dernières architectures disponibles. Il a également décrit les futurs projets et les nouvelles fonctionnalités de l'API Apache Flink Python. Nous vous encourageons à partager vos suggestions et idées.
Recommended Posts