Cet article est basé sur le contenu de la documentation Apache Beam (https://beam.apache.org/documentation/).
Il implémente un programme qui peut être traité par lots avec le SDK Apache Beam Python, et résume la procédure et la méthode pour l'exécuter avec Cloud Dataflow. Il aborde également les concepts de base d'Apache Beam, les tests et la conception.
Le SDK Apache Beam peut être sélectionné parmi ** Java **, ** Python **, ** Go ** et fournit les ** fonctions suivantes qui simplifient le mécanisme de traitement distribué **. Faire.
Les programmes créés par le SDK Apache Beam peuvent être exécutés sur des systèmes de traitement de données distribués tels que: Dans Apache Beam, cet environnement d'exécution est appelé ** Runner **.
Cette fois, nous l'exécuterons dans deux environnements d'exécution, DirectRunner et DataflowRunner.
Un programme Apache Beam général (simple) est créé et fonctionne comme suit.
Pour ce flux de processus, le pipeline serait le suivant:
Implémentons en fait un pipeline simple comme celui ci-dessus en Python. L'environnement d'exploitation est supposé être le suivant.
--Version Python: 2.7 ou supérieur pour 2 séries ou 3.5 ou supérieur pour 3 séries
Si vous n'avez pas besoin de packages supplémentaires, installez-les avec la commande suivante:
pip install apache-beam
Cette fois, nous supposons qu'il sera exécuté sur Dataflow (GCP), nous allons donc également installer des packages supplémentaires de GCP.
pip install apache-beam[gcp]
Ceci est le code complété. Je vais expliquer chacun d'eux ci-dessous.
pipeline.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
class MyOptions(PipelineOptions):
"""Options personnalisées."""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='./input.txt',
help='Input path for the pipeline')
parser.add_argument(
'--output',
default='./output.txt',
help='Output path for the pipeline')
class ComputeWordLength(beam.DoFn):
"""Processus de conversion pour trouver le nombre de caractères."""
def __init__(self):
pass
def process(self, element):
yield len(element)
def run():
options = MyOptions()
# options.view_as(StandardOptions).runner = 'DirectRunner'
p = beam.Pipeline(options=options)
(p
| 'ReadFromText' >> beam.io.ReadFromText(options.input) # I/Appliquer O Transform pour charger les données dans le chemin facultatif
| 'ComputeWordLength' >> beam.ParDo(ComputeWordLength()) #Appliquer la transformation
| 'WriteToText' >> beam.io.WriteToText(options.output)) # I/Appliquer O Transform et écrire des données dans le chemin facultatif
p.run()
if __name__ == '__main__':
run()
Pipeline L'objet Pipeline ** encapsule toutes vos tâches de traitement de données **. Les programmes Apache Beam créent généralement d'abord un objet Pipeline pour créer une PCollection et appliquer une Transform.
Pour utiliser le programme Apache Beam, vous devez d'abord créer une instance du pipeline Apache Beam SDK (généralement dans la fonction principale). Ensuite, lorsque vous créez le pipeline, vous définissez les options d'exécution.
Le code suivant est un exemple de création d'une instance de Pipeline.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions() #Options d'exécution
p = beam.Pipeline(options=options)
Vous pouvez utiliser PipelineOptions pour définir les exécuteurs qui exécutent le pipeline et ** les options spécifiques requises pour l'exécuteur sélectionné **. Par exemple, il peut contenir des informations telles que l'ID du projet et l'emplacement de stockage des fichiers.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner' #Désignation du coureur
p = beam.Pipeline(options=options)
Il existe deux options, l'une consiste à le définir par programme et l'autre à le transmettre à partir d'un argument de ligne de commande. Un exemple est décrit dans [Exécuter avec Cloud Dataflow](Exécuter avec # cloud-dataflow-) ci-dessous.
Vous pouvez ajouter des ** options personnalisées ** en plus des PipelineOptions standard. L'exemple suivant ajoute une option pour spécifier les chemins d'entrée et de sortie. Les options personnalisées vous permettent également de spécifier une description ou une valeur par défaut qui sera affichée lorsque l'utilisateur passe --help
à partir d'un argument de ligne de commande.
Vous pouvez créer des options personnalisées en ** héritant de PipelineOptions **.
class MyOptions(PipelineOptions):
"""Options personnalisées."""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input', #Nom de l'option
default='./input.txt', #Valeur par défaut
help='Input path for the pipeline') #La description
parser.add_argument(
'--output',
default='./output.txt',
help='Output path for the pipeline')
Transmettez les options que vous avez créées comme suit:
p = beam.Pipeline(options=MyOptions())
Pour définir une option personnalisée sur une valeur autre que la valeur par défaut, transmettez une valeur à partir de l'argument de ligne de commande comme suit:
--input=value --output==value
PCollection Une PCollection est un objet ** qui représente un ensemble de données à distribuer. Dans le pipeline Apache Beam, Transform utilise PCollection comme entrée et sortie. Par conséquent, si vous souhaitez traiter les données dans le pipeline, vous devez créer une PCollection.
Après avoir créé un objet Pipeline, vous devez d'abord créer au moins une PCollection.
Utilisez I / O Transform pour lire des données à partir d'une source externe ou créer une PCollection à partir de la mémoire. Ce dernier est principalement utile pour les tests et le débogage.
Utilisez I / O Transform pour créer une PCollection à partir d'une source externe. Pour lire les données, appliquez la transformation de lecture fournie par chaque transformation d'E / S à l'objet Pipeline.
Voici comment appliquer une transformation en lecture à un pipeline pour créer une PCollection:
lines = p | 'ReadFromText' >> beam.io.ReadFromText('gs://some/input-data.txt')
Utilisez Créer une transformation pour créer une PCollection à partir de la mémoire.
lines = (p | 'ReadFromInMemory' >> beam.Create(['To be, or not to be: that is the question: ', 'Whether \'tis nobler in the mind to suffer ', 'The slings and arrows of outrageous fortune, ', 'Or to take arms against a sea of troubles, ']))
Transform Transform fournit un ** cadre de traitement général **. La transformation est appliquée à chaque élément de la PCollection d'entrée.
Le SDK Apache Beam fournit une variété de transformations que vous pouvez appliquer à votre PCollection. Cela inclut les ** transformations Core ** génériques telles que ParDo et Combine, ainsi que les ** transformations composites ** qui combinent une ou plusieurs transformations Core. Diverses transformations sont fournies, veuillez donc vous référer à ici.
Chaque transformation dans le SDK Apache Beam fournit l'opérateur de canal |
, vous pouvez donc appliquer la transformation en appliquant cette méthode à l'entrée PCollection.
[Output PCollection] = [Input PCollection] | [Transform]
Vous pouvez également chaîner des transformations pour créer un pipeline comme suit:
[Output PCollection] = ([Initial Input PCollection]
| [First Transform]
| [Second Transform]
| [Third Transform])
Ce pipeline a le même flux que cet exemple d'implémentation, le pipeline aura donc cette forme.
Transform crée une nouvelle PCollection sans apporter de modifications à la PCollection d'entrée. ** Transform ne modifie pas la PCollection d'entrée. ** PCollection est invariant par définition. Par conséquent, vous pouvez appliquer plusieurs transformations à la même PCollection pour créer une branche la PCollection.
[Output PCollection] = [Initial Input PCollection]
[Output PCollection A] = [Output PCollection] | [Transform A]
[Output PCollection B] = [Output PCollection] | [Transform B]
La forme de ce pipeline ressemble à ceci:
I/O Transform Lorsque vous créez un pipeline, vous devez souvent lire des données à partir d'une source externe, telle qu'un fichier ou une base de données. De même, vous pouvez générer des données du pipeline vers un système de stockage externe.
Le SDK Apache Beam fournit une transformation d'E / S pour les types de stockage de données courants (https://beam.apache.org/documentation/io/built-in/). Si vous souhaitez lire ou écrire un stockage de données non pris en charge, vous devez implémenter votre propre transformation d'E / S.
Read Transform transforme les données lues à partir d'une source externe en PCollection. Vous pouvez utiliser la transformation en lecture à tout moment lors de la création du pipeline, mais cela se fait généralement en premier.
lines = pipeline | beam.io.ReadFromText('gs://some/input-data.txt')
Write Transform écrit les données de PCollection dans une source de données externe. Pour imprimer les résultats d'un pipeline, utilisez la transformation d'écriture à la fin du pipeline dans la plupart des cas.
output | beam.io.WriteToText('gs://some/output-data')
De nombreuses transformations de lecture prennent en charge la lecture à partir de plusieurs fichiers d'entrée qui correspondent à l'opérateur glob. L'exemple suivant utilise l'opérateur glob (*) pour lire tous les fichiers d'entrée correspondants avec le préfixe «input-» et le suffixe «.csv» à l'emplacement spécifié.
lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv')
Write Transform écrit par défaut dans plusieurs fichiers. Ce faisant, le nom de fichier est utilisé comme préfixe pour tous les fichiers de sortie.
L'exemple suivant écrit plusieurs fichiers dans un même emplacement. Chaque fichier est précédé de «nombres» et suffixé de «.csv».
output | 'WriteToText' >> beam.io.WriteToText('/path/to/numbers', file_name_suffix='.csv')
Exécutons maintenant le pipeline à l'aide du code terminé (#Completed Code). Exécutez localement et Cloud Dataflow comme environnement d'exécution.
Préparez un fichier texte contenant la chaîne de caractères suivante pour la saisie.
input.txt
good morning.
good afternoon.
good evening.
Pour exécuter le pipeline localement, définissez PipelineOptions sur DirectRunner
en tant que runner, mais vous n'avez pas besoin de spécifier explicitement le runner, sauf si vous avez un paramètre spécifique.
Exécutez la commande suivante à partir de la ligne de commande. Réécrivez la destination d'entrée et les chemins de destination de sortie en fonction de l'environnement.
python pipeline.py --input=./input.txt --output=./output.txt
Cet exemple d'implémentation est un pipeline qui compte le nombre de caractères dans un mot, le résultat suivant est donc généré.
De plus, par défaut, «beam.io.WriteToText» ajoute la chaîne «00000-of-00001» à la fin du nom de fichier pour distribuer et écrire dans plusieurs fichiers. Si vous voulez écrire dans un fichier, vous pouvez le faire en vidant l'argument shard_name_template
.
output.txt-00000-of-00001
13
15
13
Cloud Dataflow est un service entièrement géré fourni par GCP (Google Cloud Platfom) qui traite les données en mode flux ou en mode batch. .. Les utilisateurs peuvent traiter une énorme quantité de données en utilisant une capacité pratiquement illimitée sur une base de paiement à l'utilisation sans se soucier du fonctionnement des infrastructures telles que les serveurs.
L'exécution du pipeline dans Cloud Dataflow crée une tâche dans votre projet GCP qui utilise les ressources Compute Engine et Cloud Storage. Pour tirer parti de Cloud Dataflow, activez l '** API Dataflow ** dans GCP.
Une petite modification est nécessaire pour exécuter [Completed Code](#Completed Code) dans Cloud Dataflow. Modifiez-le comme suit.
pipeline.py
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions
GCP_PROJECT_ID = 'my-project-id'
GCS_BUCKET_NAME = 'gs://my-bucket-name'
JOB_NAME = 'compute-word-length'
class MyOptions(PipelineOptions):
"""Options personnalisées."""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='{}/input.txt'.format(GCS_BUCKET_NAME), #Entrée à GCS.Mettre txt
help='Input for the pipeline')
parser.add_argument(
'--output',
default='{}/output.txt'.format(GCS_BUCKET_NAME), #Sortie vers GCS
help='Output for the pipeline')
class ComputeWordLength(beam.DoFn):
"""Processus de conversion pour trouver le nombre de caractères."""
def __init__(self):
pass
def process(self, element):
yield len(element)
def run():
options = MyOptions()
#Option GCP
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = GCP_PROJECT_ID #ID du projet
google_cloud_options.job_name = JOB_NAME #Nom de travail arbitraire
google_cloud_options.staging_location = '{}/binaries'.format(GCS_BUCKET_NAME) #Chemin GCS pour les fichiers intermédiaires
google_cloud_options.temp_location = '{}/temp'.format(GCS_BUCKET_NAME) #Chemin GCS pour les fichiers temporaires
#Options des travailleurs
options.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED' #Activer la mise à l'échelle automatique
#Option standard
options.view_as(StandardOptions).runner = 'DataflowRunner' #Spécifier le runner Dataflow
p = beam.Pipeline(options=options)
(p
| 'ReadFromText' >> beam.io.ReadFromText(options.input)
| 'ComputeWordLength' >> beam.ParDo(ComputeWordLength())
| 'WriteToText' >> beam.io.WriteToText(options.output, shard_name_template=""))
p.run()
# p.run().wait_until_finish() #Bloquer jusqu'à ce que le pipeline soit terminé
if __name__ == '__main__':
run()
Voir ici pour plus d'options Dataflow (https://cloud.google.com/dataflow/docs/guides/specifying-exec-params?hl=ja#-cloud-dataflow--). L'option «streaming» doit être «true» pour effectuer le streaming.
Cela peut également être exécuté avec une commande similaire.
python pipeline.py --input=gs://my-project-id/input.txt --output=gs://my-project-id/output.txt
Les options définies dans le programme peuvent également être passées à partir des arguments de ligne de commande comme celui-ci.
python pipeline.py \
--input=gs://my-project-id/input.txt \
--output=gs://my-project-id/output.txt \
--runner=DataflowRunner \
--project=my-project-id \
--temp_location=gs://my-project-id/tmp/
...
Vous pouvez surveiller le pipeline en accédant au service Dataflow à partir de GCP. L'interface utilisateur ressemble à ceci et le résultat est émis vers le chemin spécifié.
Si vous souhaitez exécuter ce traitement par lots de Dataflow régulièrement, il est pratique d'utiliser le ** modèle Dataflow **. Pour plus d'informations, consultez ici.
Lors du test de pipelines, ** les tests unitaires locaux peuvent souvent économiser beaucoup de temps et d'efforts ** par rapport au débogage d'exécutions à distance telles que Dataflow.
Vous devez installer les éléments suivants pour résoudre la dépendance:
pip install nose
Pour tester le pipeline, utilisez l'objet TestPipeline
. Au lieu de lire l'entrée depuis une source externe, utilisez ʻapache_beam.Create pour créer une PCollection à partir de la mémoire. Comparez la sortie avec ʻassert_that
.
test_pipeline.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
from src.pipeline import ComputeWordLength
class PipelineTest(TestCase):
def test_pipeline(self):
expected = [
13,
15,
13
]
inputs = [
'good morning.',
'good afternoon.',
'good evening.'
]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ParDo(ComputeWordLength()))
assert_that(actual, equal_to(expected))
Dans [ci-dessus](application de # transform-), nous avons brièvement expliqué la conception (flux de traitement) lors de la création d'un pipeline simple et d'un pipeline de branchement. Ici, nous présenterons d'autres conceptions de pipeline courantes.
Cela peut être réalisé à l'aide de la fonctionnalité de sorties supplémentaires d'Apache Beam (https://beam.apache.org/documentation/programming-guide/#additional-outputs).
class ExtractWord(beam.DoFn):
def process(element):
if element.startswith('A'):
yield pvalue.TaggedOutput('a', element) #Donnez un nom de tag (commençant'A'Si c'est un élément de'a')
elif element.startswith('B'):
yield pvalue.TaggedOutput('b', element) #Donnez un nom de tag (commençant'B'Si c'est un élément de'b')
mixed_col = db_row_col | beam.ParDo(ExtractWord()).with_outputs()
mixed_col.a | beam.ParDo(...) # .Accessible par nom de balise
mixed_col.b | beam.ParDo(...)
Ceci peut être réalisé en utilisant «Flatten».
col_list = (a_col, b_col) | beam.Flatten()
Vous pouvez créer une PCollection à partir de chaque source d'entrée et la joindre avec CoGroupByKey
etc.
user_address = p | beam.io.ReadFromText(...)
user_order = p | beam.io.ReadFromText(...)
joined_col = (user_address, user_order) | beam.CoGroupByKey()
joined_col | beam.ParDo(...)
Vous souhaiterez peut-être également connaître les fonctionnalités suivantes afin de pouvoir gérer différents cas d'utilisation.
Composite transforms Les transformations composites sont une combinaison de plusieurs transformations (ParDo, Combine, GroupByKey ...). L'imbrication de plusieurs transformations rend votre code plus modulaire et plus facile à comprendre.
Pour implémenter des transformations composites, vous devez étendre la classe Transform et remplacer la méthode d'expansion.
"""Un pipeline qui compte le nombre de mots dans une phrase."""
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class ComputeWordCount(beam.PTransform):
"""Transformations composites comptant le nombre de mots."""
def __init__(self):
pass
def expand(self, pcoll):
return (pcoll
| 'SplitWithHalfSpace' >> beam.Map(lambda element: element.split(' '))
| 'ComputeArraySize' >> beam.Map(lambda element: len(element)))
def run():
p = beam.Pipeline(options=PipelineOptions())
inputs = ['There is no time like the present.', 'Time is money.']
(p
| 'Create' >> beam.Create(inputs)
| 'ComputeWordCount' >> ComputeWordCount()
| 'WriteToText' >> beam.io.WriteToText('Chemin de destination de sortie'))
p.run()
if __name__ == '__main__':
run()
output
7
3
Side inputs Les entrées latérales sont une fonction qui vous permet de transmettre des entrées supplémentaires (entrées secondaires) à une transformation en plus des entrées normales (entrée principale) PCollection.
"""Un pipeline qui ne produit que des chaînes avec plus de caractères que la moyenne."""
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam import pvalue
class FilterMeanLengthFn(beam.DoFn):
"""Filtrer les chaînes avec un nombre de caractères supérieur à la moyenne."""
def __init__(self):
pass
# mean_word_la longueur est une sous-entrée
def process(self, element, mean_word_length):
if len(element) >= mean_word_length:
yield element
def run():
p = beam.Pipeline(options=PipelineOptions())
inputs = ["good morning.", "good afternoon.", "good evening."]
#Sous-entrée
mean_word_length = word_lengths | 'ComputeMeanWordLength' >> beam.CombineGlobally(beam.combiners.MeanCombineFn())
#Entrée principale
output = (p
| 'Create' >> beam.Create(inputs)
| 'FilterMeanLength' >> beam.ParDo(FilterMeanLengthFn(), pvalue.AsSingleton(mean_word_length)) #Insérez une sous-entrée dans le deuxième argument de ParDo
| 'write to text' >> beam.io.WriteToText('Chemin de destination de sortie'))
p.run().wait_until_finish()
if __name__ == '__main__':
run()
Le nombre de caractères «bonjour», «bon après-midi» et «bonsoir». Sont respectivement «13», «15» et «13», et la moyenne est d'environ 13,67, le résultat est donc le suivant.
output
good afternoon.
Il décrit un peu "ce qui se passe dans le pipeline".
L'une des opérations les plus coûteuses du traitement de pipeline distribué est la ** sérialisation et la communication d'éléments entre les machines **. Le runner Apache Beam sérialise les éléments de PCollection, par exemple parce qu'il communique entre les machines. Communiquez les éléments entre la transformation et la transformation à l'étape suivante en utilisant les techniques suivantes:
Apache Beam se concentre sur le problème Parallèlement embarrassant. Comme Apache Beam attache une grande importance au traitement des éléments en parallèle, il n'est pas bon pour exprimer des actions telles que ** attribuer des numéros de séquence à chaque élément de PCollection **. En effet, ces algorithmes sont beaucoup plus susceptibles d'avoir des problèmes d'évolutivité.
** Le traitement de tous les éléments en parallèle ** présente également quelques inconvénients. Par exemple, lors de l'écriture d'un élément dans la destination de sortie. Dans le traitement de sortie, il n'est pas possible de traiter par lots tous les éléments en parallèle.
Par conséquent, le runner Apache Beam ne traite pas tous les éléments en même temps, mais regroupe et traite les éléments de PCollection. Dans le cas du traitement en continu, il a tendance à être groupé et traité en petites unités, et dans le cas du traitement par lots, il a tendance à être groupé et traité en unités plus grandes.
Lors de l'exécution d'un seul ParDo, le programme d'exécution Apache Beam peut diviser et regrouper les éléments de PCollection en deux.
Lorsque ParDo est exécuté, le worker traite les deux bundles en parallèle, comme illustré ci-dessous.
Puisqu'un seul élément ne peut pas être divisé, le parallélisme maximum d'une Transform dépend du nombre d'éléments dans la PCollection. Le nombre maximum de processus parallèles dans ce cas est de ** 9 ** comme le montre la figure.
Les ParDos peuvent être des parallèles subordonnés. Par exemple, ParDo1 et ParDo2 sont parallèles dépendants si la sortie de ParDo1 doit être traitée par le même worker comme suit:
Worker1 exécute ParDo1 sur les éléments du Bundle A, qui devient le Bundle C. Ensuite, ParDo2 est exécuté sur les éléments du Bundle C. De même, Worker2 exécute ParDo1 sur les éléments du Bundle B, qui devient Bundle D. Ensuite, ParDo2 est exécuté sur les éléments du Bundle D.
En exécutant ParDo de cette manière, les exécuteurs Apache Beam peuvent éviter de redistribuer des éléments entre les nœuds de calcul. Et cela économise des coûts de communication. Cependant, ** le nombre maximal de processus parallèles dépendra désormais du nombre maximal de processus parallèles pour le premier ParDo dans le parallèle dépendant. ** **
Si le traitement des éléments du Bundle échoue, le Bundle entier échouera. Par conséquent, le processus doit être retenté (sinon tout le pipeline échouera).
Dans l'exemple suivant, Worker1 gère avec succès les cinq éléments du bundle A. Worker2 gère les quatre éléments du Bundle B, mais les deux premiers éléments du Bundle B sont traités avec succès et le troisième élément échoue.
Le programme d'exécution Apache Beam réessaye ensuite tous les éléments du Bundle B, et la deuxième fois, il se termine avec succès. Comme indiqué, ** les tentatives ne se produisent pas toujours dans le même Worker que la tentative de traitement d'origine. ** **
Si les éléments de ParDo2 ne peuvent pas être traités après le traitement de ParDo1, ces deux transformations échoueront en même temps.
Dans l'exemple suivant, Worker2 exécute avec succès ParDo1 sur tous les éléments du Bundle B. Cependant, ParDo2 échoue car il ne peut pas gérer les éléments de Bundle D.
Par conséquent, le programme d'exécution Apache Beam doit ignorer la sortie ParDo2 et exécuter à nouveau le processus. Dans ce cas, le Bundle ParDo1 doit également être détruit et tous les éléments du ** Bundle doivent être retentés. ** **
J'ai essayé de résumer ce que j'avais appris en me basant sur le contenu de la documentation Apache Beam. Veuillez signaler toute erreur! : arc:
Recommended Posts