[PYTHON] I tried to build ML Pipeline with Cloud Composer

What to do in this article

We will orchestrate a series of tasks when performing the following machine learning using Cloud Composer of Google Cloud Platform.

  1. Put Train / Test data extracted from BigQuery into CSV file and put it in GCS
  2. Send a Training job to the ML Engine
  3. Deploy the model
  4. Send Prediction job to ML Engine
  5. Load the Prediction results from GCS into BigQuery

The node and workflow on Airflow to be created are as shown in the figure below. image.png

Target audience

--Those who have touched Cloud Composer / Air Flow --Those who have touched ML Engine / DataFlow

Language and framework

Airflow version

Architecture of each service of GCP

The above Airflow tasks can be expressed by GCP services as follows.

スクリーンショット 2019-12-30 14.36.56.png

1. Set up Cloud Composer preferences

The bash command below does three things.

1. Cloud Composer environment construction

One thing to be careful about when building the Cloud Composer environment is to specify --python-version 3 as an argument. By default, python2 series is set.

2. Install the library on airflow

In the task list shown at the beginning, there was a place to post a message in Slack. You need to install the slackclient library in airflow to perform this task. Specify the library configuration file in the --update-pypi-packages-from-file argument.

requirements.txt


slackclient~=1.3.2

3. Set environment variables on airflow

As mentioned above, ʻacccess_token is required when posting a message to slack using the slackclient library, so it is convenient to set ʻaccess_token in the environment variable of airflow, so set it in advance. (It is not so good to solidify ʻaccess_token` in the dag file)

#!/usr/bin/env bash

ENVIRONMENT_NAME=dev-composer
LOCATION=us-central1

#Read variables
eval `cat airflow/config/.secrets.conf`
echo ${slack_access_token}

#Creating an environment for cloud composer
gcloud composer environments create ${ENVIRONMENT_NAME} \
    --location ${LOCATION} \
    --python-version 3

#Install the library in the airflow environment
gcloud composer environments update ${ENVIRONMENT_NAME} \
--update-pypi-packages-from-file airflow/config/requirements.txt \
--location ${LOCATION}

#Set environment variables on airflow
gcloud composer environments run \
  --location=${LOCATION} \
  ${ENVIRONMENT_NAME} \
  variables -- \
  --set slack_access_token ${slack_access_token} project_id ${project_id}

implementation of dag file

The dag file created this time is as follows. This is not enough to explain, so I will explain by separating the code for each task.

import os
import airflow
import datetime
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
from airflow.operators.slack_operator import SlackAPIPostOperator
from airflow import configuration
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.operators.mlengine_operator \
    import MLEngineTrainingOperator, MLEngineBatchPredictionOperator

BUCKET = 'gs://your_bucket'
PROJECT_ID = Variable.get('project_id')
REGION = 'us-central1'
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
PACKAGE_URI = BUCKET + '/code/trainer-0.1.tar.gz'
OUTDIR = BUCKET + '/trained_model'
DATAFLOW_TRAIN_FILE = os.path.join(
    configuration.get('core', 'dags_folder'), 
    'dataflow', 'extract_train_data.py')

DATAFLOW_PRED_FILE = os.path.join(
    configuration.get('core', 'dags_folder'),
    'dataflow', 'extract_pred_data.py')

DATAFLOW_LOAD_FILE = os.path.join(
    configuration.get('core', 'dags_folder'),
    'dataflow', 'load.py')

DEFAULT_ARGS = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
    'project_id': Variable.get('project_id'),
    'dataflow_default_options': {
        'project': Variable.get('project_id'),
        'temp_location': 'gs://your_composer_bucket/temp',
        'runner': 'DataflowRunner'
    }
}

def get_date():
    jst_now = datetime.datetime.now()
    dt = datetime.datetime.strftime(jst_now, "%Y-%m-%d")
    return dt


with airflow.DAG(
        'asl_ml_pipeline',
        'catchup=False',
        default_args=DEFAULT_ARGS,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    start = DummyOperator(task_id='start')
    
    ####
    #Tasks for training in ML Engine
    ####

    job_id = 'dev-train-{}'.\
        format(datetime.datetime.now().strftime('%Y%m%d%H%M'))
    job_dir = BUCKET + '/jobs/' + job_id

    submit_train_job = MLEngineTrainingOperator(
        task_id='train-model',
        project_id=PROJECT_ID,
        job_id=job_id,
        package_uris=[PACKAGE_URI],
        region=REGION,
        training_python_module='trainer.task',
        training_args=[f'--output_dir={OUTDIR}',
                       f'--job_dir={job_dir}',
                       '--dropout_rate=0.5',
                       '--batch_size=128',
                       '--train_step=1'
                       ],
        scale_tier='BASIC_GPU',
        python_version='3.5'
    )
    ####
    #Task to deploy model
    #### 

    BASE_VERSION_NAME = 'v1_0'
    VERSION_NAME = '{0}_{1}'.\
        format(BASE_VERSION_NAME, datetime.datetime.now().strftime('%Y_%m_%d'))
    MODEL_NAME = 'dev_train'

    deploy_model = BashOperator(
        task_id='deploy-model',
        bash_command='gcloud ml-engine versions create '
                     '{{ params.version_name}} '
                     '--model {{ params.model_name }} '
                     '--origin $(gsutil ls gs://your_bucket/trained_model/export/exporter | tail -1) '
                     '--python-version="3.5" '
                     '--runtime-version=1.14 ',
        params={'version_name': VERSION_NAME,
                'model_name': MODEL_NAME}
    )
    
    ####
    #Tasks for batch prediction in ML Engine
    ####
    today = get_date()

    input_path = BUCKET + f'/preprocess/{today}/prediction/prediction-*'
    output_path = BUCKET + f'/result/{today}/'

    batch_prediction = MLEngineBatchPredictionOperator(
        task_id='batch-prediction',
        data_format='TEXT',
        region=REGION,
        job_id=job_id,
        input_paths=input_path,
        output_path=output_path,
        model_name=MODEL_NAME,
        version_name=VERSION_NAME
    )

    ####
    #Task to extract data with DataFlow
    ####
    job_args = {
        'output': 'gs://your_bucket/preprocess'
    }

    create_train_data = DataFlowPythonOperator(
        task_id='create-train-data',
        py_file=DATAFLOW_TRAIN_FILE,
        options=job_args
    )

    create_pred_data = DataFlowPythonOperator(
        task_id='create-pred-data',
        py_file=DATAFLOW_PRED_FILE,
        options=job_args
    )
    ####
    #Task to load data into BigQuery with DataFlow
    ####
    load_results = DataFlowPythonOperator(
        task_id='load_pred_results',
        py_file=DATAFLOW_LOAD_FILE
    )
        post_success_slack_train = SlackAPIPostOperator(
        task_id='post-success-train-to-slack',
        token=Variable.get('slack_access_token'),
        text='Train is succeeded',
        channel='#feed'
    )

    post_fail_slack_train = SlackAPIPostOperator(
        task_id='post-fail-train-to-slack',
        token=Variable.get('slack_access_token'),
        trigger_rule=TriggerRule.ONE_FAILED,
        text='Train is failed',
        channel='#feed'
    )
    ####
    #Task to POST a message to Slack
    ####
    post_success_slack_pred = SlackAPIPostOperator(
        task_id='post-success-pred-to-slack',
        token=Variable.get('slack_access_token'),
        text='Prediction is succeeded',
        channel='#feed'
    )

    post_fail_slack_pred = SlackAPIPostOperator(
        task_id='post-fail-pred-to-slack',
        token=Variable.get('slack_access_token'),
        trigger_rule=TriggerRule.ONE_FAILED,
        text='Prediction is failed',
        channel='#feed'
    )


    end = DummyOperator(task_id='end')

    start >> [create_train_data, create_pred_data] >> submit_train_job \
        >> [post_fail_slack_train, post_success_slack_train]
    post_fail_slack_train >> end

    post_success_slack_train >> deploy_model >> batch_prediction \
        >> load_results \
        >> [post_success_slack_pred, post_fail_slack_pred] >> end

Extract data from BigQuery and export to GCS

This is the first task we are doing in the Training Phase. (Red frame in the figure below) Data is extracted from BigQuery using DataFlow and placed in an appropriate bucket of GCS.

スクリーンショット 2019-12-29 16.43.40.png

Dag file description

--Constant DATAFLOW_TRAIN_FILE / DATAFLOW_PRED_FILE --File path where the DataFlow executable file is located --Files under the dags directory in the Bucket created when building the Cloud Composer environment are synchronized by the airflow worker every few seconds.

--Constant DEFAULT_ARGS --Set the environment variables when executing DataFlow in the argument of dataflow_default_options. --DataFlowPythonOperator class --Class for executing JOB using DataFlow as a python file --The py_file argument has the path where the executable file is located. --ʻOptions` Specify the argument to be passed to the executable file --This time, the file path for placing data in GCS is specified.


DATAFLOW_TRAIN_FILE = os.path.join(
    configuration.get('core', 'dags_folder'), 
    'dataflow', 'extract_train_data.py')

DATAFLOW_PRED_FILE = os.path.join(
    configuration.get('core', 'dags_folder'),
    'dataflow', 'extract_pred_data.py')

DEFAULT_ARGS = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
    'project_id': Variable.get('project_id'),
    'dataflow_default_options': {
        'project': Variable.get('project_id'),
        'temp_location': 'gs://your_composer_bucket/temp',
        'runner': 'DataflowRunner'
    }
}
    ####
    #Task to extract data with DataFlow
    ####
    #File path when putting data in GCS
    job_args = {
        'output': 'gs://your_bucket/preprocess'
    }

    create_train_data = DataFlowPythonOperator(
        task_id='create-train-data',
        py_file=DATAFLOW_TRAIN_FILE,
        options=job_args
    )

    create_pred_data = DataFlowPythonOperator(
        task_id='create-pred-data',
        py_file=DATAFLOW_PRED_FILE,
        options=job_args
    )

DataFlow executable file description

The following file is a process to divide into train data and test data and put them in GCS for training. (Also, for the sake of simplicity of the article, I will omit the explanation of the query to divide it into train data and test data. Hashing is performed on the assumption that there is a timestamp column, and the remainder after division. (Divided by the value of)

There are two points to keep in mind here.

--Convert to CSV file ――Since you want to output it as a CSV file in the end, you need to convert the data extracted from BigQuery to a comma-separated format. --Here, we have prepared a function called to_csv.

--Python version of DataFlow --Since Python of DataFlow is 3.5.x, grammars such as f-strings added from 3.6 cannot be used.

import os
import argparse
import logging
from datetime import datetime

import apache_beam as beam
from apache_beam.options.pipeline_options import \
    PipelineOptions

PROJECT = 'your_project_id'


def create_query(phase):
    base_query = """
    SELECT
        *,
        MOD(ABS(FARM_FINGERPRINT(CAST(timestamp AS STRING))), 10) AS hash_value
    FROM
        `dataset.your_table`
    """

    if phase == 'TRAIN':
        subsumple = """
        hash_value < 7
        """
    elif phase == 'TEST':
        subsumple = """
        hash_value >= 7
        """

    query = """
    SELECT 
        column1,
        column2,
        column3,
        row_number()over() as key
    FROM 
        ({0})
    WHERE {1}
    """.\
        format(base_query, subsumple)

    return query


def to_csv(line):
    csv_columns = 'column1,column2,column3,key'.split(',')
    rowstring = ','.join([str(line[k]) for k in csv_columns])
    return rowstring


def get_date():
    jst_now = datetime.now()
    dt = datetime.strftime(jst_now, "%Y-%m-%d")

    return dt


def run(argv=None):
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--output',
        required=True
    )

    known_args, pipeline_args = \
        parser.parse_known_args(argv)

    options = PipelineOptions(pipeline_args)
    with beam.Pipeline(options=options) as p:
        for phase in ['TRAIN', 'TEST']:
            query = create_query(phase)

            date = get_date()
            output_path = os.path.join(known_args.output, date,
                                       'train', "{}".format(phase))

            read = p | 'ExtractFromBigQuery_{}'.format(phase) >> beam.io.Read(
                beam.io.BigQuerySource(
                    project=PROJECT,
                    query=query,
                    use_standard_sql=True
                )
            )

            convert = read | 'ConvertToCSV_{}'.format(phase) >> beam.Map(to_csv)

            convert | 'WriteToGCS_{}'.format(phase) >> beam.io.Write(
                beam.io.WriteToText(output_path, file_name_suffix='.csv'))


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Training and deploy on ML Engine

Here, we will not explain the tasks of training using ML Engine and deploying the trained model. Also, this time, I will omit the explanation of the executable file task.py and the model file model.py used in ML Engine.

スクリーンショット 2019-12-29 16.43.40.png

We are preparing a bucket for ML Engine this time. Therefore, please note that a total of two buckets are used properly when combined with the bucket created when building the Cloud Composer environment.

.
├── your_bucket 
│    ├── code //Model required for learning.py and task.Put a gz file with py etc.
│    │
│    └── trainde_model  //The trained model file is placed
│
└── your_composer_bucket //Bucket created when building cloud composer environment

Dag file description

--Constant PACKAGE_URI --File path where the ML Engine executable file is located --This time, the gz file containing trainer.py and model.py is placed under the above gs: // your_bucket / code. --This constant is specified in the package_uris argument of the MLEngineTrainingOperator class.

--BashOperator class --Class for executing bash command --This time, I am using the bash command when deploying the trained model file. --The model is deployed by executing gcloud ml-engine versions create. -(Probably) I think you can do the same with the MLEngineVersionOperator class, but this time I used the bash command.

PACKAGE_URI = BUCKET + '/code/trainer-0.1.tar.gz'
OUTDIR = BUCKET + '/trained_model'

    job_id = 'dev-train-{}'.\
        format(datetime.datetime.now().strftime('%Y%m%d%H%M'))
    job_dir = BUCKET + '/jobs/' + job_id

    submit_train_job = MLEngineTrainingOperator(
        task_id='train-model',
        project_id=PROJECT_ID,
        job_id=job_id,
        package_uris=[PACKAGE_URI],
        region=REGION,
        training_python_module='trainer.task',
        training_args=[f'--output_dir={OUTDIR}',
                       f'--job_dir={job_dir}',
                       '--dropout_rate=0.5',
                       '--batch_size=128',
                       '--train_step=1'
                       ],
        scale_tier='BASIC_GPU',
        python_version='3.5'
    )
    today = get_date()

    BASE_VERSION_NAME = 'v1_0'
    VERSION_NAME = '{0}_{1}'.\
        format(BASE_VERSION_NAME, datetime.datetime.now().strftime('%Y_%m_%d'))
    MODEL_NAME = 'dev_model'

    deploy_model = BashOperator(
        task_id='deploy-model',
        bash_command='gcloud ml-engine versions create '
                     '{{ params.version_name}} '
                     '--model {{ params.model_name }} '
                     '--origin $(gsutil ls gs://your_bucket/trained_model/export/exporter | tail -1) '
                     '--python-version="3.5" '
                     '--runtime-version=1.14 ',
        params={'version_name': VERSION_NAME,
                'model_name': MODEL_NAME}
    )

Batch prediction with ML Engine

Here, we will explain the task of making batch prediction using the model deployed earlier.

スクリーンショット 2019-12-29 16.43.40.png

--Constant ʻinput_path` --The path of the CSV file extracted and placed in GCS for the prediction made earlier

--Constant ʻoutput_path` --GCS file path to put the predicted result

Dag file description

    input_path = BUCKET + f'/preprocess/{today}/prediction/prediction-*'
    output_path = BUCKET + f'/result/{today}/'

    batch_prediction = MLEngineBatchPredictionOperator(
        task_id='batch-prediction',
        data_format='TEXT',
        region=REGION,
        job_id=job_id,
        input_paths=input_path,
        output_path=output_path,
        model_name=MODEL_NAME,
        version_name=VERSION_NAME
    )

Load batch prediction results into BigQuery

This section describes the task of loading the forecast results predicted earlier into BigQuery using DataFlow.

スクリーンショット 2019-12-30 14.36.56.png

Dag file description

For Dag files, it's similar to the "Extract data from BigQuery" task described at the beginning. The constant DATAFLOW_LOAD_FILE specifies the GCS file path where the DataFlow executable file is located.

DATAFLOW_LOAD_FILE = os.path.join(
    configuration.get('core', 'dags_folder'),
    'dataflow', 'load.py')

    load_results = DataFlowPythonOperator(
        task_id='load_pred_results',
        py_file=DATAFLOW_LOAD_FILE
    )

DataFlow executable file description

In the following file, the file placed in GCS is read, converted to Json format, and loaded to an appropriate table of BigQuery. What you should be careful about here

--Batch prediction results are placed in GCS as a json format text file. --Since the value of the output result of prediction written in the text file is described as a list format, if you try to load it as it is, the type will not match and an error will occur. - {"key": [0], "prediction: [3.45...]"}


import logging
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

BUCKET_NAME = 'your_bucket'
INPUT = 'gs://{}/result/prediction.results-*'.format(BUCKET_NAME)


def convert(line):
    import json
    record = json.loads(line)
    return {'key': record['key'][0], 'predictions': record['predictions'][0]}


def run(argv=None):
    parser = argparse.ArgumentParser()

    known_args, pipeline_args = \
        parser.parse_known_args(argv)

    options = PipelineOptions(pipeline_args)
    
    with beam.Pipeline(options=options) as p:
        dataset = 'your_dataset.results'
        
        read = p | 'ReadPredictionResult' >> beam.io.ReadFromText(INPUT)
        json = read | 'ConvertJson' >> beam.Map(convert)
        json | 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink(
            dataset,
            schema='key:INTEGER, predictions:FLOAT',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()


in conclusion

I usually use AWS, but since I had the opportunity to touch on GCP services, I summarized ML-related services. I hope it will be helpful for those who are worried about the workflow of ML.

There is a place to deploy the model trained by ML Engine as it is, but it is not recommended. It is recommended to create a mechanism to measure / compare the accuracy of the model learned somehow, and deploy it after sandwiching the task.

Recommended Posts

I tried to build ML Pipeline with Cloud Composer
I tried to implement Autoencoder with TensorFlow
I tried to visualize AutoEncoder with TensorFlow
I tried to get started with Hy
I tried to summarize what was output with Qiita with Word cloud
I tried to implement CVAE with PyTorch
I tried to solve TSP with QAOA
[AWS] [GCP] I tried to make cloud services easy to use with Python
I tried to build a Mac Python development environment with pythonz + direnv
I tried to predict next year with AI
I tried to build a super-resolution method / ESPCN
I tried to detect Mario with pytorch + yolov3
I tried to implement reading Dataset with PyTorch
I tried to learn logical operations with TF Learn
I tried to move GAN (mnist) with keras
I tried to build a super-resolution method / SRCNN ①
I tried to save the data with discord
I tried to detect motion quickly with OpenCV
I tried to integrate with Keras in TFv1.1
I tried to get CloudWatch data with Python
I tried to output LLVM IR with Python
I tried to detect an object with M2Det!
I tried to automate sushi making with python
I tried to predict Titanic survival with PyCaret
I tried to build a super-resolution method / SRCNN ③
I tried to build a super-resolution method / SRCNN ②
I tried to operate Linux with Discord Bot
I tried to study DP with Fibonacci sequence
I tried to start Jupyter with Amazon lightsail
I tried to judge Tsundere with Naive Bayes
I tried to debug.
I tried to paste
I tried to build an environment of Ubuntu 20.04 LTS + ROS2 with Raspberry Pi 4
I tried to learn the sin function with chainer
I tried to move machine learning (ObjectDetection) with TouchDesigner
I tried to create a table only with Django
I tried to extract features with SIFT of OpenCV
I tried to move Faster R-CNN quickly with pytorch
I tried to read and save automatically with VOICEROID2 2
I tried to implement and learn DCGAN with PyTorch
I tried to implement Minesweeper on terminal with python
I tried to touch the CSV file with Python
I tried to draw a route map with Python
I tried to solve the soma cube with python
I tried to get started with blender python script_Part 02
I tried to generate ObjectId (primary key) with pymongo
I tried to implement time series prediction with GBDT
[Go + Gin] I tried to build a Docker environment
I tried to uncover our darkness with Chatwork API
I tried to automatically generate a password with Python3
[Introduction to Pytorch] I tried categorizing Cifar10 with VGG16 ♬
[ML Ops] I want to do multi-project with Python
I tried to solve the problem with Python Vol.1
I tried to analyze J League data with Python
I tried to implement Grad-CAM with keras and tensorflow
I tried to make an OCR application with PySimpleGUI
I tried to implement SSD with PyTorch now (Dataset)
I tried to interpolate Mask R-CNN with Optical Flow
I tried to step through Bayesian optimization. (With examples)
I tried to find an alternating series with tensorflow
[Introduction to AWS] I tried playing with voice-text conversion ♪