[PYTHON] Ich habe versucht, eine ML-Pipeline mit Cloud Composer zu erstellen

Was ist in diesem Artikel zu tun?

Wir werden eine Reihe von Aufgaben koordinieren, wenn wir das folgende maschinelle Lernen mit Cloud Composer von Google Cloud Platform durchführen.

  1. Fügen Sie die aus BigQuery extrahierten Train / Test-Daten in eine CSV-Datei ein und fügen Sie sie in GCS ein.
  2. Senden Sie einen Schulungsjob an die ML Engine
  3. Stellen Sie das Modell bereit
  4. Senden Sie den Vorhersagejob an ML Engine
  5. Laden Sie das in GCS gespeicherte Vorhersageergebnis in BigQuery

Der Knoten und der Workflow für den zu erstellenden Luftstrom sind in der folgenden Abbildung dargestellt. image.png

Zielgruppe

Sprache und Rahmen

Luftstromversion

Architektur jedes Dienstes von GCP

Die oben genannten Airflow-Aufgaben können von den GCP-Diensten wie folgt ausgedrückt werden.

スクリーンショット 2019-12-30 14.36.56.png

1. Richten Sie die Cloud Composer-Einstellungen ein

Der folgende Bash-Befehl führt drei Dinge aus.

1. Aufbau einer Cloud Composer-Umgebung

Eine Sache, die Sie beim Erstellen der Cloud Composer-Umgebung beachten sollten, ist die Angabe von "--python-version 3" als Argument. Standardmäßig ist die Python2-Serie festgelegt.

2. Installieren der Bibliothek im Luftstrom

In der am Anfang angezeigten Aufgabenliste gab es einen Platz zum Posten einer Nachricht in Slack. Sie müssen die Bibliothek "slackclient" im Luftstrom installieren, um diese Aufgabe auszuführen. Geben Sie die Bibliothekskonfigurationsdatei im Argument "--update-pypi-packages-from-file" an.

requirements.txt


slackclient~=1.3.2

3. Stellen Sie Umgebungsvariablen für den Luftstrom ein

Wie oben erwähnt, ist "acccess_token" erforderlich, wenn eine Nachricht mithilfe der slackclient-Bibliothek an slack gesendet wird. Daher ist es zweckmäßig, "access_token" in der Umgebungsvariablen "Luftstrom" festzulegen. Stellen Sie sie daher im Voraus ein. (Es ist nicht so gut, access_token in der dag-Datei zu verfestigen)

#!/usr/bin/env bash

ENVIRONMENT_NAME=dev-composer
LOCATION=us-central1

#Variablen lesen
eval `cat airflow/config/.secrets.conf`
echo ${slack_access_token}

#Erstellen einer Umgebung für Cloud Composer
gcloud composer environments create ${ENVIRONMENT_NAME} \
    --location ${LOCATION} \
    --python-version 3

#Installieren Sie die Bibliothek in der Luftstromumgebung
gcloud composer environments update ${ENVIRONMENT_NAME} \
--update-pypi-packages-from-file airflow/config/requirements.txt \
--location ${LOCATION}

#Stellen Sie Umgebungsvariablen für den Luftstrom ein
gcloud composer environments run \
  --location=${LOCATION} \
  ${ENVIRONMENT_NAME} \
  variables -- \
  --set slack_access_token ${slack_access_token} project_id ${project_id}

Implementierung der DAG-Datei

Die diesmal erstellte DAG-Datei lautet wie folgt. Dies reicht nicht aus, um es zu erklären, daher werde ich es erklären, indem ich den Code für jede Aufgabe trenne.

import os
import airflow
import datetime
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
from airflow.operators.slack_operator import SlackAPIPostOperator
from airflow import configuration
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.operators.mlengine_operator \
    import MLEngineTrainingOperator, MLEngineBatchPredictionOperator

BUCKET = 'gs://your_bucket'
PROJECT_ID = Variable.get('project_id')
REGION = 'us-central1'
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
PACKAGE_URI = BUCKET + '/code/trainer-0.1.tar.gz'
OUTDIR = BUCKET + '/trained_model'
DATAFLOW_TRAIN_FILE = os.path.join(
    configuration.get('core', 'dags_folder'), 
    'dataflow', 'extract_train_data.py')

DATAFLOW_PRED_FILE = os.path.join(
    configuration.get('core', 'dags_folder'),
    'dataflow', 'extract_pred_data.py')

DATAFLOW_LOAD_FILE = os.path.join(
    configuration.get('core', 'dags_folder'),
    'dataflow', 'load.py')

DEFAULT_ARGS = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
    'project_id': Variable.get('project_id'),
    'dataflow_default_options': {
        'project': Variable.get('project_id'),
        'temp_location': 'gs://your_composer_bucket/temp',
        'runner': 'DataflowRunner'
    }
}

def get_date():
    jst_now = datetime.datetime.now()
    dt = datetime.datetime.strftime(jst_now, "%Y-%m-%d")
    return dt


with airflow.DAG(
        'asl_ml_pipeline',
        'catchup=False',
        default_args=DEFAULT_ARGS,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    start = DummyOperator(task_id='start')
    
    ####
    #Aufgaben zum Trainieren auf ML Engine
    ####

    job_id = 'dev-train-{}'.\
        format(datetime.datetime.now().strftime('%Y%m%d%H%M'))
    job_dir = BUCKET + '/jobs/' + job_id

    submit_train_job = MLEngineTrainingOperator(
        task_id='train-model',
        project_id=PROJECT_ID,
        job_id=job_id,
        package_uris=[PACKAGE_URI],
        region=REGION,
        training_python_module='trainer.task',
        training_args=[f'--output_dir={OUTDIR}',
                       f'--job_dir={job_dir}',
                       '--dropout_rate=0.5',
                       '--batch_size=128',
                       '--train_step=1'
                       ],
        scale_tier='BASIC_GPU',
        python_version='3.5'
    )
    ####
    #Aufgabe zum Bereitstellen des Modells
    #### 

    BASE_VERSION_NAME = 'v1_0'
    VERSION_NAME = '{0}_{1}'.\
        format(BASE_VERSION_NAME, datetime.datetime.now().strftime('%Y_%m_%d'))
    MODEL_NAME = 'dev_train'

    deploy_model = BashOperator(
        task_id='deploy-model',
        bash_command='gcloud ml-engine versions create '
                     '{{ params.version_name}} '
                     '--model {{ params.model_name }} '
                     '--origin $(gsutil ls gs://your_bucket/trained_model/export/exporter | tail -1) '
                     '--python-version="3.5" '
                     '--runtime-version=1.14 ',
        params={'version_name': VERSION_NAME,
                'model_name': MODEL_NAME}
    )
    
    ####
    #Aufgabe zur Stapelvorhersage mit ML Engine
    ####
    today = get_date()

    input_path = BUCKET + f'/preprocess/{today}/prediction/prediction-*'
    output_path = BUCKET + f'/result/{today}/'

    batch_prediction = MLEngineBatchPredictionOperator(
        task_id='batch-prediction',
        data_format='TEXT',
        region=REGION,
        job_id=job_id,
        input_paths=input_path,
        output_path=output_path,
        model_name=MODEL_NAME,
        version_name=VERSION_NAME
    )

    ####
    #Aufgabe zum Extrahieren von Daten mit DataFlow
    ####
    job_args = {
        'output': 'gs://your_bucket/preprocess'
    }

    create_train_data = DataFlowPythonOperator(
        task_id='create-train-data',
        py_file=DATAFLOW_TRAIN_FILE,
        options=job_args
    )

    create_pred_data = DataFlowPythonOperator(
        task_id='create-pred-data',
        py_file=DATAFLOW_PRED_FILE,
        options=job_args
    )
    ####
    #Aufgabe zum Laden von Daten in BigQuery mit DataFlow
    ####
    load_results = DataFlowPythonOperator(
        task_id='load_pred_results',
        py_file=DATAFLOW_LOAD_FILE
    )
        post_success_slack_train = SlackAPIPostOperator(
        task_id='post-success-train-to-slack',
        token=Variable.get('slack_access_token'),
        text='Train is succeeded',
        channel='#feed'
    )

    post_fail_slack_train = SlackAPIPostOperator(
        task_id='post-fail-train-to-slack',
        token=Variable.get('slack_access_token'),
        trigger_rule=TriggerRule.ONE_FAILED,
        text='Train is failed',
        channel='#feed'
    )
    ####
    #Aufgabe, eine Nachricht an Slack zu senden
    ####
    post_success_slack_pred = SlackAPIPostOperator(
        task_id='post-success-pred-to-slack',
        token=Variable.get('slack_access_token'),
        text='Prediction is succeeded',
        channel='#feed'
    )

    post_fail_slack_pred = SlackAPIPostOperator(
        task_id='post-fail-pred-to-slack',
        token=Variable.get('slack_access_token'),
        trigger_rule=TriggerRule.ONE_FAILED,
        text='Prediction is failed',
        channel='#feed'
    )


    end = DummyOperator(task_id='end')

    start >> [create_train_data, create_pred_data] >> submit_train_job \
        >> [post_fail_slack_train, post_success_slack_train]
    post_fail_slack_train >> end

    post_success_slack_train >> deploy_model >> batch_prediction \
        >> load_results \
        >> [post_success_slack_pred, post_fail_slack_pred] >> end

Extrahieren von Daten aus BigQuery und Exportieren nach GCS

Dies ist die erste Aufgabe, die wir in der Trainingsphase ausführen. (Roter Rahmen in der Abbildung unten) Daten werden mit DataFlow aus BigQuery extrahiert und in einen geeigneten Eimer GCS gestellt.

スクリーンショット 2019-12-29 16.43.40.png

Beschreibung der Dag-Datei


DATAFLOW_TRAIN_FILE = os.path.join(
    configuration.get('core', 'dags_folder'), 
    'dataflow', 'extract_train_data.py')

DATAFLOW_PRED_FILE = os.path.join(
    configuration.get('core', 'dags_folder'),
    'dataflow', 'extract_pred_data.py')

DEFAULT_ARGS = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
    'project_id': Variable.get('project_id'),
    'dataflow_default_options': {
        'project': Variable.get('project_id'),
        'temp_location': 'gs://your_composer_bucket/temp',
        'runner': 'DataflowRunner'
    }
}
    ####
    #Aufgabe zum Extrahieren von Daten mit DataFlow
    ####
    #Dateipfad beim Einfügen von Daten in GCS
    job_args = {
        'output': 'gs://your_bucket/preprocess'
    }

    create_train_data = DataFlowPythonOperator(
        task_id='create-train-data',
        py_file=DATAFLOW_TRAIN_FILE,
        options=job_args
    )

    create_pred_data = DataFlowPythonOperator(
        task_id='create-pred-data',
        py_file=DATAFLOW_PRED_FILE,
        options=job_args
    )

Beschreibung der ausführbaren DataFlow-Datei

Die folgende Datei ist ein Prozess zum Aufteilen in "Zugdaten" und "Testdaten" und zum Einfügen in GCS für das Training. (Zur Vereinfachung des Artikels werde ich auch die Erläuterung der Abfrage zur Unterteilung in "Zugdaten" und "Testdaten" weglassen. Das Hashing wird unter der Annahme durchgeführt, dass eine Zeitstempelspalte vorhanden ist, und der Rest nach der Unterteilung (Dividiert durch den Wert von)

Hier sind zwei Punkte zu beachten.

import os
import argparse
import logging
from datetime import datetime

import apache_beam as beam
from apache_beam.options.pipeline_options import \
    PipelineOptions

PROJECT = 'your_project_id'


def create_query(phase):
    base_query = """
    SELECT
        *,
        MOD(ABS(FARM_FINGERPRINT(CAST(timestamp AS STRING))), 10) AS hash_value
    FROM
        `dataset.your_table`
    """

    if phase == 'TRAIN':
        subsumple = """
        hash_value < 7
        """
    elif phase == 'TEST':
        subsumple = """
        hash_value >= 7
        """

    query = """
    SELECT 
        column1,
        column2,
        column3,
        row_number()over() as key
    FROM 
        ({0})
    WHERE {1}
    """.\
        format(base_query, subsumple)

    return query


def to_csv(line):
    csv_columns = 'column1,column2,column3,key'.split(',')
    rowstring = ','.join([str(line[k]) for k in csv_columns])
    return rowstring


def get_date():
    jst_now = datetime.now()
    dt = datetime.strftime(jst_now, "%Y-%m-%d")

    return dt


def run(argv=None):
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--output',
        required=True
    )

    known_args, pipeline_args = \
        parser.parse_known_args(argv)

    options = PipelineOptions(pipeline_args)
    with beam.Pipeline(options=options) as p:
        for phase in ['TRAIN', 'TEST']:
            query = create_query(phase)

            date = get_date()
            output_path = os.path.join(known_args.output, date,
                                       'train', "{}".format(phase))

            read = p | 'ExtractFromBigQuery_{}'.format(phase) >> beam.io.Read(
                beam.io.BigQuerySource(
                    project=PROJECT,
                    query=query,
                    use_standard_sql=True
                )
            )

            convert = read | 'ConvertToCSV_{}'.format(phase) >> beam.Map(to_csv)

            convert | 'WriteToGCS_{}'.format(phase) >> beam.io.Write(
                beam.io.WriteToText(output_path, file_name_suffix='.csv'))


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Schulung und Bereitstellung auf ML Engine

Hier werden die Aufgaben des Trainings mit ML Engine und des Einsatzes des trainierten Modells nicht erläutert. Auch dieses Mal werde ich die Erklärung der Ausführungsdatei task.py und der in ML Engine verwendeten Modelldatei model.py weglassen.

スクリーンショット 2019-12-29 16.43.40.png

Diesmal bereiten wir einen Eimer für ML Engine vor. Beachten Sie daher, dass insgesamt zwei Buckets ordnungsgemäß verwendet werden, wenn sie mit dem Bucket kombiniert werden, das beim Erstellen der Cloud Composer-Umgebung erstellt wurde.

.
├── your_bucket 
│    ├── code //Modell zum Lernen erforderlich.py und Aufgabe.Legen Sie eine gz-Datei mit py usw.
│    │
│    └── trainde_model  //Die trainierte Modelldatei wird platziert
│
└── your_composer_bucket //Bucket erstellt beim Erstellen einer Cloud Composer-Umgebung

Beschreibung der Dag-Datei

PACKAGE_URI = BUCKET + '/code/trainer-0.1.tar.gz'
OUTDIR = BUCKET + '/trained_model'

    job_id = 'dev-train-{}'.\
        format(datetime.datetime.now().strftime('%Y%m%d%H%M'))
    job_dir = BUCKET + '/jobs/' + job_id

    submit_train_job = MLEngineTrainingOperator(
        task_id='train-model',
        project_id=PROJECT_ID,
        job_id=job_id,
        package_uris=[PACKAGE_URI],
        region=REGION,
        training_python_module='trainer.task',
        training_args=[f'--output_dir={OUTDIR}',
                       f'--job_dir={job_dir}',
                       '--dropout_rate=0.5',
                       '--batch_size=128',
                       '--train_step=1'
                       ],
        scale_tier='BASIC_GPU',
        python_version='3.5'
    )
    today = get_date()

    BASE_VERSION_NAME = 'v1_0'
    VERSION_NAME = '{0}_{1}'.\
        format(BASE_VERSION_NAME, datetime.datetime.now().strftime('%Y_%m_%d'))
    MODEL_NAME = 'dev_model'

    deploy_model = BashOperator(
        task_id='deploy-model',
        bash_command='gcloud ml-engine versions create '
                     '{{ params.version_name}} '
                     '--model {{ params.model_name }} '
                     '--origin $(gsutil ls gs://your_bucket/trained_model/export/exporter | tail -1) '
                     '--python-version="3.5" '
                     '--runtime-version=1.14 ',
        params={'version_name': VERSION_NAME,
                'model_name': MODEL_NAME}
    )

Chargenvorhersage mit ML Engine

Hier erklären wir die Aufgabe der Stapelvorhersage anhand des zuvor bereitgestellten Modells.

スクリーンショット 2019-12-29 16.43.40.png

Beschreibung der Dag-Datei

    input_path = BUCKET + f'/preprocess/{today}/prediction/prediction-*'
    output_path = BUCKET + f'/result/{today}/'

    batch_prediction = MLEngineBatchPredictionOperator(
        task_id='batch-prediction',
        data_format='TEXT',
        region=REGION,
        job_id=job_id,
        input_paths=input_path,
        output_path=output_path,
        model_name=MODEL_NAME,
        version_name=VERSION_NAME
    )

Laden Sie das Ergebnis der Stapelvorhersage in BigQuery

In diesem Abschnitt wird die Aufgabe beschrieben, das zuvor vorhergesagte Vorhersageergebnis mithilfe von DataFlow in BigQuery zu laden.

スクリーンショット 2019-12-30 14.36.56.png

Beschreibung der Dag-Datei

Bei Dag-Dateien ähnelt es der eingangs beschriebenen Aufgabe "Daten aus BigQuery extrahieren". Die Konstante DATAFLOW_LOAD_FILE gibt den GCS-Dateipfad an, in dem sich die ausführbare DataFlow-Datei befindet.

DATAFLOW_LOAD_FILE = os.path.join(
    configuration.get('core', 'dags_folder'),
    'dataflow', 'load.py')

    load_results = DataFlowPythonOperator(
        task_id='load_pred_results',
        py_file=DATAFLOW_LOAD_FILE
    )

Beschreibung der ausführbaren DataFlow-Datei

In der folgenden Datei wird die in GCS platzierte Datei gelesen, in das Json-Format konvertiert und in eine entsprechende BigQuery-Tabelle geladen. Worauf sollten Sie hier achten?


import logging
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

BUCKET_NAME = 'your_bucket'
INPUT = 'gs://{}/result/prediction.results-*'.format(BUCKET_NAME)


def convert(line):
    import json
    record = json.loads(line)
    return {'key': record['key'][0], 'predictions': record['predictions'][0]}


def run(argv=None):
    parser = argparse.ArgumentParser()

    known_args, pipeline_args = \
        parser.parse_known_args(argv)

    options = PipelineOptions(pipeline_args)
    
    with beam.Pipeline(options=options) as p:
        dataset = 'your_dataset.results'
        
        read = p | 'ReadPredictionResult' >> beam.io.ReadFromText(INPUT)
        json = read | 'ConvertJson' >> beam.Map(convert)
        json | 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink(
            dataset,
            schema='key:INTEGER, predictions:FLOAT',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()


abschließend

Normalerweise verwende ich AWS, aber da ich die Möglichkeit hatte, auf GCP-Dienste einzugehen, habe ich ML-bezogene Dienste zusammengefasst. Ich hoffe, es ist hilfreich für diejenigen, die sich Sorgen um den ML-Workflow machen.

Es gibt einen Ort, an dem das mit ML Engine trainierte Modell bereitgestellt werden kann, dies wird jedoch nicht empfohlen. Wir empfehlen, dass Sie einen Mechanismus erstellen, um die Genauigkeit des erlernten Modells zu messen / zu vergleichen und ihn nach dem Einfügen der Aufgabe bereitzustellen.

Recommended Posts

Ich habe versucht, eine ML-Pipeline mit Cloud Composer zu erstellen
Ich habe versucht, Autoencoder mit TensorFlow zu implementieren
Ich habe versucht, AutoEncoder mit TensorFlow zu visualisieren
Ich habe versucht, mit Hy anzufangen
Ich habe versucht zusammenzufassen, was mit Qiita mit Word Cloud ausgegeben wurde
Ich habe versucht, CVAE mit PyTorch zu implementieren
Ich habe versucht, TSP mit QAOA zu lösen
[AWS] [GCP] Ich habe versucht, die Verwendung von Cloud-Diensten mit Python zu vereinfachen
Ich habe versucht, eine Mac Python-Entwicklungsumgebung mit pythonz + direnv zu erstellen
Ich habe versucht, nächstes Jahr mit AI vorherzusagen
Ich habe versucht, eine Super-Resolution-Methode / ESPCN zu erstellen
Ich habe versucht, das Lesen von Dataset mit PyTorch zu implementieren
Ich habe versucht, mit TF Learn die logische Operation zu lernen
Ich habe versucht, GAN (mnist) mit Keras zu bewegen
Ich habe versucht, eine Super-Resolution-Methode / SRCNN build zu erstellen
Ich habe versucht, die Daten mit Zwietracht zu speichern
Ich habe versucht, mit OpenCV Bewegungen schnell zu erkennen
Ich habe versucht, Keras in TFv1.1 zu integrieren
Ich habe versucht, CloudWatch-Daten mit Python abzurufen
Ich habe versucht, LLVM IR mit Python auszugeben
Ich habe versucht, ein Objekt mit M2Det zu erkennen!
Ich habe versucht, die Herstellung von Sushi mit Python zu automatisieren
Ich habe versucht, das Überleben der Titanic mit PyCaret vorherzusagen
Ich habe versucht, eine Super-Resolution-Methode / SRCNN build zu erstellen
Ich habe versucht, eine Super-Resolution-Methode / SRCNN build zu erstellen
Ich habe versucht, Linux mit Discord Bot zu betreiben
Ich habe versucht, DP mit Fibonacci-Sequenz zu studieren
Ich habe versucht, Jupyter mit allen Amazon-Lichtern zu starten
Ich habe versucht, Tundele mit Naive Bays zu beurteilen
Ich habe versucht zu debuggen.
Ich habe versucht, mit Raspberry Pi 4 eine Umgebung von Ubuntu 20.04 LTS + ROS2 zu erstellen
Ich habe versucht, die Sündenfunktion mit Chainer zu trainieren
Ich habe versucht, maschinelles Lernen (Objekterkennung) mit TouchDesigner zu verschieben
Ich habe versucht, Funktionen mit SIFT von OpenCV zu extrahieren
Ich habe versucht, Faster R-CNN mit Pytorch auszuführen
Ich habe versucht, mit VOICEROID2 2 automatisch zu lesen und zu speichern
Ich habe versucht, DCGAN mit PyTorch zu implementieren und zu lernen
Ich habe versucht, Mine Sweeper auf dem Terminal mit Python zu implementieren
Ich habe versucht, eine CSV-Datei mit Python zu berühren
Ich habe versucht, Soma Cube mit Python zu lösen
Ich habe versucht, mit Blenders Python script_Part 02 zu beginnen
Ich habe versucht, ObjectId (Primärschlüssel) mit Pymongo zu generieren
[Go + Gin] Ich habe versucht, eine Docker-Umgebung zu erstellen
Ich habe versucht, unsere Dunkelheit mit der Chatwork-API aufzudecken
[Einführung in Pytorch] Ich habe versucht, Cifar10 mit VGG16 ♬ zu kategorisieren
Ich habe versucht, das Problem mit Python Vol.1 zu lösen
Ich habe versucht, Grad-CAM mit Keras und Tensorflow zu implementieren
Ich habe versucht, eine OCR-App mit PySimpleGUI zu erstellen
Ich habe versucht, SSD jetzt mit PyTorch zu implementieren (Dataset)
Ich habe versucht, Mask R-CNN mit Optical Flow zu interpolieren
Ich habe versucht, die Bayes'sche Optimierung zu durchlaufen. (Mit Beispielen)
Ich habe versucht, die alternative Klasse mit Tensorflow zu finden
[Einführung in AWS] Ich habe versucht, mit der Sprach-Text-Konvertierung zu spielen ♪