Cet article est l'article du 6ème jour du Calendrier de l'Avent Puri Puri Appliance 2019.
Cette fois, je vais vous présenter comment traiter une grande quantité de données par traitement distribué à l'aide de Dataflow (ApacheBeam) + Python3, que j'utilise habituellement en tant qu'ingénieur ML.
Que présenter cette fois
Ne pas introduire cette fois
Dataflow est l'un des services fournis par Google Could Platform. Site Web officiel de Dataflow Comme il est facile de mettre en œuvre le traitement distribué et qu'il est facile de le lier à BigQuery, il est souvent utilisé comme tableau d'analyse. Dataflow est implémenté en interne à l'aide d'Apache Beam, un framework d'implémentation du traitement de pipeline. Par conséquent, vous utiliserez probablement le SDK ApacheBeam pour le développement réel. Site Web officiel d'Apache Beam Dataflow prend en charge deux types de SDK ApachBeam, Java et Python.
Jetons un coup d'œil au flux entre la création de l'environnement et l'exécution du processus sur Dataflow.
Construisons un environnement virtuel pour le développement à l'aide de Pipenv. (Si les versions de Python et d'autres bibliothèques sont identiques, vous n'avez pas besoin d'utiliser Pipenv.) Lancez l'environnement en utilisant un Pipfile comme celui ci-dessous.
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
apache-beam=="1.14.*"
[requires]
python_version = "3.7"
Points à surveiller
--Version du SDK ApacheBeam Dataflow prend en charge les versions du SDK jusqu'à «1.14». --Version Python Pour le moment (6 décembre 2019), la version de PythonSDK prise en charge par Dataflow est la série 2, et un avertissement sera émis lors de l'utilisation de la série Python 3. Cependant, d'après mon expérience, il est extrêmement rare qu'un problème se produise et je pense qu'il est sûr de le faire fonctionner en production. (Cependant, nous ne pouvons pas assumer la responsabilité, veuillez donc l'utiliser à votre propre discrétion à la fin)
Vous aurez besoin de setup.py
pour l'enregistrer et l'exécuter en tant que modèle sur Dataflow.
Ici, nous décrirons les dépendances lors de l'exécution.
ʻEntry_points` Veuillez spécifier en fonction de votre propre configuration de paquet.
setup.py
PACKAGES = [
"apache-beam[gcp]==2.14.*",
]
setup(
name='dataflow-sample',
url='',
author='',
author_email='',
version='0.1',
install_requires=REQUIRED_PACKAGES,
packages=find_packages(),
include_package_data=True,
entry_points=dict(console_scripts=[
'sample=sample:main'
]),
description='dataflow sample',
)
Configurer pour exécuter le pipeline Apache Beam et implémenter la fonction principale. Les étapes requises sont les suivantes.
setup_sample.py
import sys
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def setup(args):
runner = "DirectRunner"
return beam.Pipeline(runner, options=PipelineOptions(args))
def main():
pipeline = setup(sys.args)
with pipeline as p:
#Décrivez le pipeline
pass
Pour travailler réellement avec Dataflow, il est nécessaire de définir diverses options spécifiées dans le SDK. Ce qui suit est une liste de ceux typiques.
StandardOptions
name | type | description |
---|---|---|
streaming | boolean | Sélectionnez le mode de diffusion en continu ou le mode par lots |
SetupOptions
name | type | description |
---|---|---|
setup_file | string | setup.Spécifiez le chemin de py |
GoogleCouldOptions
name | type | description |
---|---|---|
region | string | Spécifiez la région à utiliser |
project | string | Spécifiez l'ID de projet à utiliser |
job_name | string | Spécifiez le nom lorsque le travail a été exécuté(Valeur arbitraire) |
template_location | string | Spécifiez le chemin de GCP pour enregistrer le modèle |
Ces options doivent être spécifiées dans le code ou dans les arguments de ligne de commande au moment de l'exécution. Lorsqu'il est spécifié par code, c'est comme suit.
options_sample.py
def option_setting(options: PipelineOptions) -> PipelineOptions:
cloud_options = options.view_as(GoogleCloudOptions)
cloud_options.region = "asia-northeast1"
cloud_options.project = "your project id"
setup_options = options.view_as(SetupOptions)
setup_options.setup_file = "specify your setup.py path"
return options
def setup(args):
runner = "DirectRunner"
options = option_setting(PipelineOptions(args))
return beam.Pipeline(runner, options=options)
Fondamentalement, il se comporte comme des «options» que vous souhaitez définir avec «PipelineOptions.view_as ()». Il ne vous reste plus qu'à définir la valeur de la propriété que vous souhaitez spécifier.
Vous pouvez également créer vos propres options personnalisées si vous disposez des paramètres dont vous avez besoin au moment de l'exécution. L'implémentation hérite simplement de «PipelineOptions» et remplace les méthodes requises.
costom_options_sample.py
class CostomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--hoge',
type=int,
default=0,
help='This is Costom Value'
)
Définissons un pipeline qui lit réellement les données de BigQuery et les stocke dans BigQuery. À titre d'exemple simple, implémentons le processus d'extraction uniquement de l'identifiant de la table User et de l'insertion dans une autre table.
pipeline.py
def b2b_pipline(pipe: PCollection):
#Décrivez le SQL exécuté
query = "SELECT id, name, age FROM sample.users"
_ = (pipe
| "Read from BigQuery" >> beam.io.Read(BigQuerySource(query=query, use_standard_sql=True))
| "Preprocess" >> beam.Map(lambda data: data["id"])
| "Write to BigQuery" >> apache_beam.io.WriteToBigQuery(
table="user_ids",
schema="id:INTEGER",
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND,
)
)
Dans le traitement Pipline, trois opérations sont effectuées: entrée, opération intermédiaire et sortie. Il existe de nombreux types autres que ceux introduits cette fois, vous pouvez donc les personnaliser en vous référant à la Référence officielle.
Déplaçons le pipeline implémenté sur Local et GCP.
Vous avez le choix entre plusieurs environnements lors de l'exécution d'Apache Beam.
Cas d'utilisation | Runner | template_location |
---|---|---|
Exécuter en local | DirectRunner | None |
Exécuter avec Dataflow | DataflowRunner | None |
Exécuter en tant que modèle dans Dataflow | DataflowRunner | Spécifiez le chemin GCS pour enregistrer le modèle |
L'enregistrement du modèle vous permet d'enregistrer le pipeline dans GCS et de le lancer à partir de la console ou de la ligne de commande. Ceci est très utile lorsque vous souhaitez exécuter Pipeline à temps.
Cette fois, j'ai présenté comment implémenter ApcheBeam en Python et l'exécuter dans Dataflow. J'espère que cela vous sera utile.
Recommended Posts