Traitez le Big Data avec Dataflow (ApacheBeam) + Python3

introduction

Cet article est l'article du 6ème jour du Calendrier de l'Avent Puri Puri Appliance 2019.

Cette fois, je vais vous présenter comment traiter une grande quantité de données par traitement distribué à l'aide de Dataflow (ApacheBeam) + Python3, que j'utilise habituellement en tant qu'ingénieur ML.

Que présenter cette fois

Ne pas introduire cette fois

Qu'est-ce que Dataflow? Qu'est-ce qu'Apache Beam?

Dataflow est l'un des services fournis par Google Could Platform. Site Web officiel de Dataflow Comme il est facile de mettre en œuvre le traitement distribué et qu'il est facile de le lier à BigQuery, il est souvent utilisé comme tableau d'analyse. Dataflow est implémenté en interne à l'aide d'Apache Beam, un framework d'implémentation du traitement de pipeline. Par conséquent, vous utiliserez probablement le SDK ApacheBeam pour le développement réel. Site Web officiel d'Apache Beam Dataflow prend en charge deux types de SDK ApachBeam, Java et Python.

Je vais vraiment le toucher

Jetons un coup d'œil au flux entre la création de l'environnement et l'exécution du processus sur Dataflow.

Environnement

Construisons un environnement virtuel pour le développement à l'aide de Pipenv. (Si les versions de Python et d'autres bibliothèques sont identiques, vous n'avez pas besoin d'utiliser Pipenv.) Lancez l'environnement en utilisant un Pipfile comme celui ci-dessous.

[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]

[packages]
apache-beam=="1.14.*"

[requires]
python_version = "3.7"

Points à surveiller

--Version du SDK ApacheBeam Dataflow prend en charge les versions du SDK jusqu'à «1.14». --Version Python Pour le moment (6 décembre 2019), la version de PythonSDK prise en charge par Dataflow est la série 2, et un avertissement sera émis lors de l'utilisation de la série Python 3. Cependant, d'après mon expérience, il est extrêmement rare qu'un problème se produise et je pense qu'il est sûr de le faire fonctionner en production. (Cependant, nous ne pouvons pas assumer la responsabilité, veuillez donc l'utiliser à votre propre discrétion à la fin)

Créer Setup.py

Vous aurez besoin de setup.py pour l'enregistrer et l'exécuter en tant que modèle sur Dataflow. Ici, nous décrirons les dépendances lors de l'exécution. ʻEntry_points` Veuillez spécifier en fonction de votre propre configuration de paquet.

setup.py


PACKAGES = [
    "apache-beam[gcp]==2.14.*",
]

setup(
    name='dataflow-sample',
    url='',
    author='',
    author_email='',
    version='0.1',
    install_requires=REQUIRED_PACKAGES,
    packages=find_packages(),
    include_package_data=True,
    entry_points=dict(console_scripts=[
        'sample=sample:main'
    ]),
    description='dataflow sample',
)

Configurer le pipeline

Configurer pour exécuter le pipeline Apache Beam et implémenter la fonction principale. Les étapes requises sont les suivantes.

setup_sample.py


import sys
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


def setup(args):
    runner = "DirectRunner" 
    return beam.Pipeline(runner, options=PipelineOptions(args))


def main():
    pipeline = setup(sys.args)
    with pipeline as p:
        #Décrivez le pipeline
        pass

Définir les options

Pour travailler réellement avec Dataflow, il est nécessaire de définir diverses options spécifiées dans le SDK. Ce qui suit est une liste de ceux typiques.

StandardOptions

name type description
streaming boolean Sélectionnez le mode de diffusion en continu ou le mode par lots

SetupOptions

name type description
setup_file string setup.Spécifiez le chemin de py

GoogleCouldOptions

name type description
region string Spécifiez la région à utiliser
project string Spécifiez l'ID de projet à utiliser
job_name string Spécifiez le nom lorsque le travail a été exécuté(Valeur arbitraire)
template_location string Spécifiez le chemin de GCP pour enregistrer le modèle

Ces options doivent être spécifiées dans le code ou dans les arguments de ligne de commande au moment de l'exécution. Lorsqu'il est spécifié par code, c'est comme suit.

options_sample.py


def option_setting(options: PipelineOptions) -> PipelineOptions:
    cloud_options = options.view_as(GoogleCloudOptions)
    cloud_options.region = "asia-northeast1"
    cloud_options.project = "your project id"
    
    setup_options = options.view_as(SetupOptions)
    setup_options.setup_file = "specify your setup.py path"
    return options

def setup(args):
    runner = "DirectRunner" 
    options = option_setting(PipelineOptions(args))
    return beam.Pipeline(runner, options=options)

Fondamentalement, il se comporte comme des «options» que vous souhaitez définir avec «PipelineOptions.view_as ()». Il ne vous reste plus qu'à définir la valeur de la propriété que vous souhaitez spécifier.

Vous pouvez également créer vos propres options personnalisées si vous disposez des paramètres dont vous avez besoin au moment de l'exécution. L'implémentation hérite simplement de «PipelineOptions» et remplace les méthodes requises.

costom_options_sample.py


class CostomOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--hoge',
            type=int,
            default=0,
            help='This is Costom Value'
        )

Définir le pipeline BigQuery vers BigQuery

Définissons un pipeline qui lit réellement les données de BigQuery et les stocke dans BigQuery. À titre d'exemple simple, implémentons le processus d'extraction uniquement de l'identifiant de la table User et de l'insertion dans une autre table.

pipeline.py


def b2b_pipline(pipe: PCollection):

    #Décrivez le SQL exécuté
    query = "SELECT id, name, age FROM sample.users"
    
    _ = (pipe
        | "Read from BigQuery" >> beam.io.Read(BigQuerySource(query=query, use_standard_sql=True))
        | "Preprocess" >> beam.Map(lambda data: data["id"])
        | "Write to BigQuery" >> apache_beam.io.WriteToBigQuery(
                table="user_ids",
                schema="id:INTEGER",
                create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=BigQueryDisposition.WRITE_APPEND,
            ) 
        )

Dans le traitement Pipline, trois opérations sont effectuées: entrée, opération intermédiaire et sortie. Il existe de nombreux types autres que ceux introduits cette fois, vous pouvez donc les personnaliser en vous référant à la Référence officielle.

À propos de l'environnement qui fonctionne réellement

Déplaçons le pipeline implémenté sur Local et GCP.

Vous avez le choix entre plusieurs environnements lors de l'exécution d'Apache Beam.

Cas d'utilisation Runner template_location
Exécuter en local DirectRunner None
Exécuter avec Dataflow DataflowRunner None
Exécuter en tant que modèle dans Dataflow DataflowRunner Spécifiez le chemin GCS pour enregistrer le modèle

L'enregistrement du modèle vous permet d'enregistrer le pipeline dans GCS et de le lancer à partir de la console ou de la ligne de commande. Ceci est très utile lorsque vous souhaitez exécuter Pipeline à temps.

À la fin

Cette fois, j'ai présenté comment implémenter ApcheBeam en Python et l'exécuter dans Dataflow. J'espère que cela vous sera utile.

Recommended Posts

Traitez le Big Data avec Dataflow (ApacheBeam) + Python3
Traiter les données Pubmed .xml avec python
Traiter les données Pubmed .xml avec python [Partie 2]
Analyse de données avec python 2
Analyse de données avec Python
Traiter les données csv avec python (traitement du comptage à l'aide de pandas)
Traitez le XML avec Python.
Lire des données json avec python
[Python] Obtenez des données économiques avec DataReader
Exécutez XGBoost avec Cloud Dataflow (Python)
Structure de données Python apprise avec la chimioinfomatique
Visualisez facilement vos données avec Python seaborn.
Analyse de données à partir de python (visualisation de données 1)
Analyse de données à partir de python (visualisation de données 2)
Application de Python: Nettoyage des données Partie 2: Nettoyage des données à l'aide de DataFrame
Obtenez des données supplémentaires vers LDAP avec python
Construction de pipeline de données avec Python et Luigi
Recevoir des données textuelles de mysql avec python
[Note] Obtenir des données de PostgreSQL avec Python
Obtenez des données alimentaires avec l'API Amazon (Python)
Essayez de travailler avec des données binaires en Python
Générer des données de test japonais avec Python Faker
Convertir des données Excel en JSON avec python
Téléchargez les données de cours des actions japonaises avec Python
Manipulation des données DynamoDB avec Lambda (Node et Python)
Convertissez des données FX 1 minute en données 5 minutes avec Python
Traiter plusieurs listes avec for en Python
Analyse de données à partir de python (pré-traitement des données-apprentissage automatique)
Organisez les données séparées par dossier avec Python
FizzBuzz en Python3
Grattage avec Python
Créez des données de test comme ça avec Python (partie 1)
Statistiques avec python
Lire les données avec python / netCDF> nc.variables [] / Vérifier la taille des données
Grattage avec Python
Lire les données csv Python avec Pandas ⇒ Graphique avec Matplotlib
Python avec Go
Analyse de données python
Lire les données de la table dans un fichier PDF avec Python
Intégrer avec Python
Obtenez des données sur le cours de l'action avec l'API Quandl [Python]
J'ai essayé d'obtenir des données CloudWatch avec Python
AES256 avec python
Testé avec Python
Une histoire sur la gestion des données binaires en Python
python commence par ()
Folium: Visualisez les données sur une carte avec Python
avec syntaxe (Python)
Bingo avec python
Écrire des données CSV sur AWS-S3 avec AWS-Lambda + Python
Zundokokiyoshi avec python
J'ai commencé l'apprentissage automatique avec le prétraitement des données Python
Excel avec Python
[python] Lecture de données
Micro-ordinateur avec Python
Extraire des données d'une page Web avec Python
Cast avec python
[Python] Créer un tableau structuré (stocker des données hétérogènes avec NumPy)