[PYTHON] J'ai essayé de créer un pipeline ML avec Cloud Composer

Que faire dans cet article

Nous orchestrerons une série de tâches lors de l'exécution du machine learning suivant à l'aide de Cloud Composer de Google Cloud Platform.

  1. Placez les données Train / Test extraites de BigQuery dans un fichier CSV et placez-les dans GCS.
  2. Envoyez une tâche d'entraînement à ML Engine
  3. Déployez le modèle
  4. Envoyer la tâche de prédiction à ML Engine
  5. Chargez le résultat de la prédiction stocké dans GCS dans BigQuery

Le nœud et le workflow sur Airflow à créer sont indiqués dans la figure ci-dessous. image.png

Public cible

Langage et cadre

Version Airflow

Architecture de chaque service de GCP

Les tâches Airflow ci-dessus peuvent être exprimées par les services GCP comme suit.

スクリーンショット 2019-12-30 14.36.56.png

1. Configurer les préférences de Cloud Composer

La commande bash ci-dessous fait trois choses.

1. Construction de l'environnement Cloud Composer

Une chose à laquelle il faut faire attention lors de la création de l'environnement Cloud Composer est de spécifier --python-version 3 comme argument. Par défaut, la série python2 est définie.

2. Installation de la bibliothèque sur Airflow

Dans la liste des tâches affichée au début, il y avait un endroit pour publier un message dans Slack. Vous devez installer la bibliothèque slackclient dans Airflow pour effectuer cette tâche. Spécifiez le fichier de configuration de la bibliothèque dans l'argument --update-pypi-packages-from-file.

requirements.txt


slackclient~=1.3.2

3. Définissez les variables d'environnement sur le flux d'air

Comme mentionné ci-dessus, ʻacccess_token est requis lors de l'envoi d'un message à slack en utilisant la bibliothèque slackclient, il est donc pratique de définir ʻaccess_token dans la variable d'environnement de flux d'air, donc définissez-le à l'avance. (Ce n'est pas si bon de solidifier ʻaccess_token` dans le fichier dag)

#!/usr/bin/env bash

ENVIRONMENT_NAME=dev-composer
LOCATION=us-central1

#Lire les variables
eval `cat airflow/config/.secrets.conf`
echo ${slack_access_token}

#Créer un environnement pour Cloud Composer
gcloud composer environments create ${ENVIRONMENT_NAME} \
    --location ${LOCATION} \
    --python-version 3

#Installez la bibliothèque dans l'environnement de flux d'air
gcloud composer environments update ${ENVIRONMENT_NAME} \
--update-pypi-packages-from-file airflow/config/requirements.txt \
--location ${LOCATION}

#Définir les variables d'environnement sur le flux d'air
gcloud composer environments run \
  --location=${LOCATION} \
  ${ENVIRONMENT_NAME} \
  variables -- \
  --set slack_access_token ${slack_access_token} project_id ${project_id}

implémentation du fichier dag

Le fichier dag créé cette fois est le suivant. Cela ne suffit pas à expliquer, je vais donc l'expliquer en séparant le code pour chaque tâche.

import os
import airflow
import datetime
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
from airflow.operators.slack_operator import SlackAPIPostOperator
from airflow import configuration
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.operators.mlengine_operator \
    import MLEngineTrainingOperator, MLEngineBatchPredictionOperator

BUCKET = 'gs://your_bucket'
PROJECT_ID = Variable.get('project_id')
REGION = 'us-central1'
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
PACKAGE_URI = BUCKET + '/code/trainer-0.1.tar.gz'
OUTDIR = BUCKET + '/trained_model'
DATAFLOW_TRAIN_FILE = os.path.join(
    configuration.get('core', 'dags_folder'), 
    'dataflow', 'extract_train_data.py')

DATAFLOW_PRED_FILE = os.path.join(
    configuration.get('core', 'dags_folder'),
    'dataflow', 'extract_pred_data.py')

DATAFLOW_LOAD_FILE = os.path.join(
    configuration.get('core', 'dags_folder'),
    'dataflow', 'load.py')

DEFAULT_ARGS = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
    'project_id': Variable.get('project_id'),
    'dataflow_default_options': {
        'project': Variable.get('project_id'),
        'temp_location': 'gs://your_composer_bucket/temp',
        'runner': 'DataflowRunner'
    }
}

def get_date():
    jst_now = datetime.datetime.now()
    dt = datetime.datetime.strftime(jst_now, "%Y-%m-%d")
    return dt


with airflow.DAG(
        'asl_ml_pipeline',
        'catchup=False',
        default_args=DEFAULT_ARGS,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    start = DummyOperator(task_id='start')
    
    ####
    #Tâches de formation sur ML Engine
    ####

    job_id = 'dev-train-{}'.\
        format(datetime.datetime.now().strftime('%Y%m%d%H%M'))
    job_dir = BUCKET + '/jobs/' + job_id

    submit_train_job = MLEngineTrainingOperator(
        task_id='train-model',
        project_id=PROJECT_ID,
        job_id=job_id,
        package_uris=[PACKAGE_URI],
        region=REGION,
        training_python_module='trainer.task',
        training_args=[f'--output_dir={OUTDIR}',
                       f'--job_dir={job_dir}',
                       '--dropout_rate=0.5',
                       '--batch_size=128',
                       '--train_step=1'
                       ],
        scale_tier='BASIC_GPU',
        python_version='3.5'
    )
    ####
    #Tâche de déploiement du modèle
    #### 

    BASE_VERSION_NAME = 'v1_0'
    VERSION_NAME = '{0}_{1}'.\
        format(BASE_VERSION_NAME, datetime.datetime.now().strftime('%Y_%m_%d'))
    MODEL_NAME = 'dev_train'

    deploy_model = BashOperator(
        task_id='deploy-model',
        bash_command='gcloud ml-engine versions create '
                     '{{ params.version_name}} '
                     '--model {{ params.model_name }} '
                     '--origin $(gsutil ls gs://your_bucket/trained_model/export/exporter | tail -1) '
                     '--python-version="3.5" '
                     '--runtime-version=1.14 ',
        params={'version_name': VERSION_NAME,
                'model_name': MODEL_NAME}
    )
    
    ####
    #Tâche de prédiction par lots avec ML Engine
    ####
    today = get_date()

    input_path = BUCKET + f'/preprocess/{today}/prediction/prediction-*'
    output_path = BUCKET + f'/result/{today}/'

    batch_prediction = MLEngineBatchPredictionOperator(
        task_id='batch-prediction',
        data_format='TEXT',
        region=REGION,
        job_id=job_id,
        input_paths=input_path,
        output_path=output_path,
        model_name=MODEL_NAME,
        version_name=VERSION_NAME
    )

    ####
    #Tâche d'extraction de données avec DataFlow
    ####
    job_args = {
        'output': 'gs://your_bucket/preprocess'
    }

    create_train_data = DataFlowPythonOperator(
        task_id='create-train-data',
        py_file=DATAFLOW_TRAIN_FILE,
        options=job_args
    )

    create_pred_data = DataFlowPythonOperator(
        task_id='create-pred-data',
        py_file=DATAFLOW_PRED_FILE,
        options=job_args
    )
    ####
    #Tâche de chargement de données dans BigQuery avec DataFlow
    ####
    load_results = DataFlowPythonOperator(
        task_id='load_pred_results',
        py_file=DATAFLOW_LOAD_FILE
    )
        post_success_slack_train = SlackAPIPostOperator(
        task_id='post-success-train-to-slack',
        token=Variable.get('slack_access_token'),
        text='Train is succeeded',
        channel='#feed'
    )

    post_fail_slack_train = SlackAPIPostOperator(
        task_id='post-fail-train-to-slack',
        token=Variable.get('slack_access_token'),
        trigger_rule=TriggerRule.ONE_FAILED,
        text='Train is failed',
        channel='#feed'
    )
    ####
    #Tâche pour POSTER un message à Slack
    ####
    post_success_slack_pred = SlackAPIPostOperator(
        task_id='post-success-pred-to-slack',
        token=Variable.get('slack_access_token'),
        text='Prediction is succeeded',
        channel='#feed'
    )

    post_fail_slack_pred = SlackAPIPostOperator(
        task_id='post-fail-pred-to-slack',
        token=Variable.get('slack_access_token'),
        trigger_rule=TriggerRule.ONE_FAILED,
        text='Prediction is failed',
        channel='#feed'
    )


    end = DummyOperator(task_id='end')

    start >> [create_train_data, create_pred_data] >> submit_train_job \
        >> [post_fail_slack_train, post_success_slack_train]
    post_fail_slack_train >> end

    post_success_slack_train >> deploy_model >> batch_prediction \
        >> load_results \
        >> [post_success_slack_pred, post_fail_slack_pred] >> end

Extraction de données de BigQuery et exportation vers GCS

C'est la première tâche que nous effectuons dans la phase de formation. (Cadre rouge dans la figure ci-dessous) Les données sont extraites de BigQuery à l'aide de DataFlow et placées dans un bucket approprié de GCS.

スクリーンショット 2019-12-29 16.43.40.png

Description du fichier DAG

--Constant DATAFLOW_TRAIN_FILE / DATAFLOW_PRED_FILE

--Constant DEFAULT_ARGS --Définissez les variables d'environnement lors de l'exécution de DataFlow dans l'argument de dataflow_default_options.


DATAFLOW_TRAIN_FILE = os.path.join(
    configuration.get('core', 'dags_folder'), 
    'dataflow', 'extract_train_data.py')

DATAFLOW_PRED_FILE = os.path.join(
    configuration.get('core', 'dags_folder'),
    'dataflow', 'extract_pred_data.py')

DEFAULT_ARGS = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
    'project_id': Variable.get('project_id'),
    'dataflow_default_options': {
        'project': Variable.get('project_id'),
        'temp_location': 'gs://your_composer_bucket/temp',
        'runner': 'DataflowRunner'
    }
}
    ####
    #Tâche d'extraction de données avec DataFlow
    ####
    #Chemin du fichier lors de la mise des données dans GCS
    job_args = {
        'output': 'gs://your_bucket/preprocess'
    }

    create_train_data = DataFlowPythonOperator(
        task_id='create-train-data',
        py_file=DATAFLOW_TRAIN_FILE,
        options=job_args
    )

    create_pred_data = DataFlowPythonOperator(
        task_id='create-pred-data',
        py_file=DATAFLOW_PRED_FILE,
        options=job_args
    )

Description du fichier exécutable DataFlow

Le fichier suivant est un processus à diviser en «données de train» et «données de test» et à les mettre dans GCS pour la formation. (Aussi, dans un souci de simplification de l'article, je vais omettre l'explication de la requête pour diviser en "données de train" et "données de test". Le hachage est effectué en supposant qu'il existe une colonne d'horodatage, et le reste après la division (Divisé par la valeur de)

Il y a deux points à prendre en compte ici.

--Convertir en fichier CSV

--Version Python de DataFlow

import os
import argparse
import logging
from datetime import datetime

import apache_beam as beam
from apache_beam.options.pipeline_options import \
    PipelineOptions

PROJECT = 'your_project_id'


def create_query(phase):
    base_query = """
    SELECT
        *,
        MOD(ABS(FARM_FINGERPRINT(CAST(timestamp AS STRING))), 10) AS hash_value
    FROM
        `dataset.your_table`
    """

    if phase == 'TRAIN':
        subsumple = """
        hash_value < 7
        """
    elif phase == 'TEST':
        subsumple = """
        hash_value >= 7
        """

    query = """
    SELECT 
        column1,
        column2,
        column3,
        row_number()over() as key
    FROM 
        ({0})
    WHERE {1}
    """.\
        format(base_query, subsumple)

    return query


def to_csv(line):
    csv_columns = 'column1,column2,column3,key'.split(',')
    rowstring = ','.join([str(line[k]) for k in csv_columns])
    return rowstring


def get_date():
    jst_now = datetime.now()
    dt = datetime.strftime(jst_now, "%Y-%m-%d")

    return dt


def run(argv=None):
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--output',
        required=True
    )

    known_args, pipeline_args = \
        parser.parse_known_args(argv)

    options = PipelineOptions(pipeline_args)
    with beam.Pipeline(options=options) as p:
        for phase in ['TRAIN', 'TEST']:
            query = create_query(phase)

            date = get_date()
            output_path = os.path.join(known_args.output, date,
                                       'train', "{}".format(phase))

            read = p | 'ExtractFromBigQuery_{}'.format(phase) >> beam.io.Read(
                beam.io.BigQuerySource(
                    project=PROJECT,
                    query=query,
                    use_standard_sql=True
                )
            )

            convert = read | 'ConvertToCSV_{}'.format(phase) >> beam.Map(to_csv)

            convert | 'WriteToGCS_{}'.format(phase) >> beam.io.Write(
                beam.io.WriteToText(output_path, file_name_suffix='.csv'))


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Formation et déploiement sur ML Engine

Ici, nous n'expliquerons pas les tâches d'entraînement à l'aide de ML Engine et de déploiement du modèle entraîné. De plus, cette fois, j'omettrai l'explication du fichier d'exécution «task.py» et du fichier modèle «model.py» utilisé dans ML Engine.

スクリーンショット 2019-12-29 16.43.40.png

Nous préparons cette fois un seau pour ML Engine. Par conséquent, veuillez noter qu'un total de deux buckets est utilisé correctement lorsqu'ils sont combinés avec le bucket créé lors de la création de l'environnement Cloud Composer.

.
├── your_bucket 
│    ├── code //Modèle requis pour l'apprentissage.py et tâche.Mettez un fichier gz avec py etc.
│    │
│    └── trainde_model  //Le fichier de modèle entraîné est placé
│
└── your_composer_bucket //Bucket créé lors de la création de l'environnement Cloud Composer

Description du fichier DAG

--Constant PACKAGE_URI

PACKAGE_URI = BUCKET + '/code/trainer-0.1.tar.gz'
OUTDIR = BUCKET + '/trained_model'

    job_id = 'dev-train-{}'.\
        format(datetime.datetime.now().strftime('%Y%m%d%H%M'))
    job_dir = BUCKET + '/jobs/' + job_id

    submit_train_job = MLEngineTrainingOperator(
        task_id='train-model',
        project_id=PROJECT_ID,
        job_id=job_id,
        package_uris=[PACKAGE_URI],
        region=REGION,
        training_python_module='trainer.task',
        training_args=[f'--output_dir={OUTDIR}',
                       f'--job_dir={job_dir}',
                       '--dropout_rate=0.5',
                       '--batch_size=128',
                       '--train_step=1'
                       ],
        scale_tier='BASIC_GPU',
        python_version='3.5'
    )
    today = get_date()

    BASE_VERSION_NAME = 'v1_0'
    VERSION_NAME = '{0}_{1}'.\
        format(BASE_VERSION_NAME, datetime.datetime.now().strftime('%Y_%m_%d'))
    MODEL_NAME = 'dev_model'

    deploy_model = BashOperator(
        task_id='deploy-model',
        bash_command='gcloud ml-engine versions create '
                     '{{ params.version_name}} '
                     '--model {{ params.model_name }} '
                     '--origin $(gsutil ls gs://your_bucket/trained_model/export/exporter | tail -1) '
                     '--python-version="3.5" '
                     '--runtime-version=1.14 ',
        params={'version_name': VERSION_NAME,
                'model_name': MODEL_NAME}
    )

Prédiction par lots avec ML Engine

Ici, nous allons expliquer la tâche de faire une prédiction par lots en utilisant le modèle déployé précédemment.

スクリーンショット 2019-12-29 16.43.40.png

--Constant ʻinput_path`

--Constant ʻoutput_path`

Description du fichier DAG

    input_path = BUCKET + f'/preprocess/{today}/prediction/prediction-*'
    output_path = BUCKET + f'/result/{today}/'

    batch_prediction = MLEngineBatchPredictionOperator(
        task_id='batch-prediction',
        data_format='TEXT',
        region=REGION,
        job_id=job_id,
        input_paths=input_path,
        output_path=output_path,
        model_name=MODEL_NAME,
        version_name=VERSION_NAME
    )

Chargez le résultat de la prédiction par lots dans BigQuery

Cette section décrit la tâche de chargement du résultat de prédiction prévu précédemment dans BigQuery à l'aide de DataFlow.

スクリーンショット 2019-12-30 14.36.56.png

Description du fichier DAG

Pour les fichiers Dag, elle est similaire à la tâche "Extraire les données de BigQuery" décrite au début. La constante DATAFLOW_LOAD_FILE spécifie le chemin du fichier GCS où se trouve le fichier exécutable DataFlow.

DATAFLOW_LOAD_FILE = os.path.join(
    configuration.get('core', 'dags_folder'),
    'dataflow', 'load.py')

    load_results = DataFlowPythonOperator(
        task_id='load_pred_results',
        py_file=DATAFLOW_LOAD_FILE
    )

Description du fichier exécutable DataFlow

Dans le fichier suivant, le fichier placé dans GCS est lu, converti au format Json et chargé dans une table appropriée de BigQuery. À quoi vous devez faire attention ici


import logging
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

BUCKET_NAME = 'your_bucket'
INPUT = 'gs://{}/result/prediction.results-*'.format(BUCKET_NAME)


def convert(line):
    import json
    record = json.loads(line)
    return {'key': record['key'][0], 'predictions': record['predictions'][0]}


def run(argv=None):
    parser = argparse.ArgumentParser()

    known_args, pipeline_args = \
        parser.parse_known_args(argv)

    options = PipelineOptions(pipeline_args)
    
    with beam.Pipeline(options=options) as p:
        dataset = 'your_dataset.results'
        
        read = p | 'ReadPredictionResult' >> beam.io.ReadFromText(INPUT)
        json = read | 'ConvertJson' >> beam.Map(convert)
        json | 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink(
            dataset,
            schema='key:INTEGER, predictions:FLOAT',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()


en conclusion

J'utilise généralement AWS, mais depuis que j'ai eu l'occasion de parler des services GCP, j'ai résumé les services liés au ML. J'espère que cela sera utile pour ceux qui s'inquiètent du flux de travail ML.

Il existe un endroit pour déployer le modèle formé avec ML Engine tel quel, mais ce n'est pas recommandé. Nous vous recommandons de créer un mécanisme pour mesurer / comparer la précision du modèle que vous avez appris et de le déployer après avoir pris en sandwich la tâche.

Recommended Posts

J'ai essayé de créer un pipeline ML avec Cloud Composer
J'ai essayé d'implémenter Autoencoder avec TensorFlow
J'ai essayé de visualiser AutoEncoder avec TensorFlow
J'ai essayé de commencer avec Hy
J'ai essayé de résumer ce qui était sorti avec Qiita avec Word cloud
J'ai essayé d'implémenter CVAE avec PyTorch
J'ai essayé de résoudre TSP avec QAOA
[AWS] [GCP] J'ai essayé de rendre les services cloud faciles à utiliser avec Python
J'ai essayé de créer un environnement de développement Mac Python avec pythonz + direnv
J'ai essayé de prédire l'année prochaine avec l'IA
J'ai essayé de créer une méthode de super résolution / ESPCN
J'ai essayé d'implémenter la lecture de Dataset avec PyTorch
J'ai essayé d'apprendre le fonctionnement logique avec TF Learn
J'ai essayé de déplacer GAN (mnist) avec keras
J'ai essayé de créer une méthode de super résolution / SRCNN ①
J'ai essayé de sauvegarder les données avec discorde
J'ai essayé de détecter rapidement un mouvement avec OpenCV
J'ai essayé d'intégrer Keras dans TFv1.1
J'ai essayé d'obtenir des données CloudWatch avec Python
J'ai essayé de sortir LLVM IR avec Python
J'ai essayé de détecter un objet avec M2Det!
J'ai essayé d'automatiser la fabrication des sushis avec python
J'ai essayé de prédire la survie du Titanic avec PyCaret
J'ai essayé de créer une méthode de super résolution / SRCNN ③
J'ai essayé de créer une méthode de super résolution / SRCNN ②
J'ai essayé d'utiliser Linux avec Discord Bot
J'ai essayé d'étudier DP avec séquence de Fibonacci
J'ai essayé de démarrer Jupyter avec toutes les lumières d'Amazon
J'ai essayé de juger Tundele avec Naive Bays
J'ai essayé de déboguer.
J'ai essayé de créer un environnement Ubuntu 20.04 LTS + ROS2 avec Raspberry Pi 4
J'ai essayé d'entraîner la fonction péché avec chainer
J'ai essayé de déplacer l'apprentissage automatique (détection d'objet) avec TouchDesigner
J'ai essayé d'extraire des fonctionnalités avec SIFT d'OpenCV
J'ai essayé de déplacer Faster R-CNN rapidement avec pytorch
J'ai essayé de lire et d'enregistrer automatiquement avec VOICEROID2 2
J'ai essayé d'implémenter et d'apprendre DCGAN avec PyTorch
J'ai essayé d'implémenter Mine Sweeper sur un terminal avec python
J'ai essayé de toucher un fichier CSV avec Python
J'ai essayé de résoudre Soma Cube avec python
J'ai essayé de démarrer avec le script python de blender_Partie 02
J'ai essayé de générer ObjectId (clé primaire) avec pymongo
[Go + Gin] J'ai essayé de créer un environnement Docker
J'ai essayé de découvrir notre obscurité avec l'API Chatwork
[Introduction à Pytorch] J'ai essayé de catégoriser Cifar10 avec VGG16 ♬
J'ai essayé de résoudre le problème avec Python Vol.1
J'ai essayé d'implémenter Grad-CAM avec keras et tensorflow
J'ai essayé de créer une application OCR avec PySimpleGUI
J'ai essayé d'implémenter SSD avec PyTorch maintenant (Dataset)
J'ai essayé d'interpoler le masque R-CNN avec un flux optique
J'ai essayé de passer par l'optimisation bayésienne. (Avec des exemples)
J'ai essayé de trouver la classe alternative avec tensorflow
[Introduction à AWS] J'ai essayé de jouer avec la conversion voix-texte ♪