Wir werden eine Reihe von Aufgaben koordinieren, wenn wir das folgende maschinelle Lernen mit Cloud Composer von Google Cloud Platform durchführen.
Der Knoten und der Workflow für den zu erstellenden Luftstrom sind in der folgenden Abbildung dargestellt.
Die oben genannten Airflow-Aufgaben können von den GCP-Diensten wie folgt ausgedrückt werden.
Der folgende Bash-Befehl führt drei Dinge aus.
Eine Sache, die Sie beim Erstellen der Cloud Composer-Umgebung beachten sollten, ist die Angabe von "--python-version 3" als Argument. Standardmäßig ist die Python2-Serie festgelegt.
In der am Anfang angezeigten Aufgabenliste gab es einen Platz zum Posten einer Nachricht in Slack. Sie müssen die Bibliothek "slackclient" im Luftstrom installieren, um diese Aufgabe auszuführen. Geben Sie die Bibliothekskonfigurationsdatei im Argument "--update-pypi-packages-from-file" an.
requirements.txt
slackclient~=1.3.2
Wie oben erwähnt, ist "acccess_token" erforderlich, wenn eine Nachricht mithilfe der slackclient-Bibliothek an slack gesendet wird. Daher ist es zweckmäßig, "access_token" in der Umgebungsvariablen "Luftstrom" festzulegen. Stellen Sie sie daher im Voraus ein. (Es ist nicht so gut, access_token
in der dag-Datei zu verfestigen)
#!/usr/bin/env bash
ENVIRONMENT_NAME=dev-composer
LOCATION=us-central1
#Variablen lesen
eval `cat airflow/config/.secrets.conf`
echo ${slack_access_token}
#Erstellen einer Umgebung für Cloud Composer
gcloud composer environments create ${ENVIRONMENT_NAME} \
--location ${LOCATION} \
--python-version 3
#Installieren Sie die Bibliothek in der Luftstromumgebung
gcloud composer environments update ${ENVIRONMENT_NAME} \
--update-pypi-packages-from-file airflow/config/requirements.txt \
--location ${LOCATION}
#Stellen Sie Umgebungsvariablen für den Luftstrom ein
gcloud composer environments run \
--location=${LOCATION} \
${ENVIRONMENT_NAME} \
variables -- \
--set slack_access_token ${slack_access_token} project_id ${project_id}
Die diesmal erstellte DAG-Datei lautet wie folgt. Dies reicht nicht aus, um es zu erklären, daher werde ich es erklären, indem ich den Code für jede Aufgabe trenne.
import os
import airflow
import datetime
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
from airflow.operators.slack_operator import SlackAPIPostOperator
from airflow import configuration
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.operators.mlengine_operator \
import MLEngineTrainingOperator, MLEngineBatchPredictionOperator
BUCKET = 'gs://your_bucket'
PROJECT_ID = Variable.get('project_id')
REGION = 'us-central1'
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
PACKAGE_URI = BUCKET + '/code/trainer-0.1.tar.gz'
OUTDIR = BUCKET + '/trained_model'
DATAFLOW_TRAIN_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'extract_train_data.py')
DATAFLOW_PRED_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'extract_pred_data.py')
DATAFLOW_LOAD_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'load.py')
DEFAULT_ARGS = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
'project_id': Variable.get('project_id'),
'dataflow_default_options': {
'project': Variable.get('project_id'),
'temp_location': 'gs://your_composer_bucket/temp',
'runner': 'DataflowRunner'
}
}
def get_date():
jst_now = datetime.datetime.now()
dt = datetime.datetime.strftime(jst_now, "%Y-%m-%d")
return dt
with airflow.DAG(
'asl_ml_pipeline',
'catchup=False',
default_args=DEFAULT_ARGS,
schedule_interval=datetime.timedelta(days=1)) as dag:
start = DummyOperator(task_id='start')
####
#Aufgaben zum Trainieren auf ML Engine
####
job_id = 'dev-train-{}'.\
format(datetime.datetime.now().strftime('%Y%m%d%H%M'))
job_dir = BUCKET + '/jobs/' + job_id
submit_train_job = MLEngineTrainingOperator(
task_id='train-model',
project_id=PROJECT_ID,
job_id=job_id,
package_uris=[PACKAGE_URI],
region=REGION,
training_python_module='trainer.task',
training_args=[f'--output_dir={OUTDIR}',
f'--job_dir={job_dir}',
'--dropout_rate=0.5',
'--batch_size=128',
'--train_step=1'
],
scale_tier='BASIC_GPU',
python_version='3.5'
)
####
#Aufgabe zum Bereitstellen des Modells
####
BASE_VERSION_NAME = 'v1_0'
VERSION_NAME = '{0}_{1}'.\
format(BASE_VERSION_NAME, datetime.datetime.now().strftime('%Y_%m_%d'))
MODEL_NAME = 'dev_train'
deploy_model = BashOperator(
task_id='deploy-model',
bash_command='gcloud ml-engine versions create '
'{{ params.version_name}} '
'--model {{ params.model_name }} '
'--origin $(gsutil ls gs://your_bucket/trained_model/export/exporter | tail -1) '
'--python-version="3.5" '
'--runtime-version=1.14 ',
params={'version_name': VERSION_NAME,
'model_name': MODEL_NAME}
)
####
#Aufgabe zur Stapelvorhersage mit ML Engine
####
today = get_date()
input_path = BUCKET + f'/preprocess/{today}/prediction/prediction-*'
output_path = BUCKET + f'/result/{today}/'
batch_prediction = MLEngineBatchPredictionOperator(
task_id='batch-prediction',
data_format='TEXT',
region=REGION,
job_id=job_id,
input_paths=input_path,
output_path=output_path,
model_name=MODEL_NAME,
version_name=VERSION_NAME
)
####
#Aufgabe zum Extrahieren von Daten mit DataFlow
####
job_args = {
'output': 'gs://your_bucket/preprocess'
}
create_train_data = DataFlowPythonOperator(
task_id='create-train-data',
py_file=DATAFLOW_TRAIN_FILE,
options=job_args
)
create_pred_data = DataFlowPythonOperator(
task_id='create-pred-data',
py_file=DATAFLOW_PRED_FILE,
options=job_args
)
####
#Aufgabe zum Laden von Daten in BigQuery mit DataFlow
####
load_results = DataFlowPythonOperator(
task_id='load_pred_results',
py_file=DATAFLOW_LOAD_FILE
)
post_success_slack_train = SlackAPIPostOperator(
task_id='post-success-train-to-slack',
token=Variable.get('slack_access_token'),
text='Train is succeeded',
channel='#feed'
)
post_fail_slack_train = SlackAPIPostOperator(
task_id='post-fail-train-to-slack',
token=Variable.get('slack_access_token'),
trigger_rule=TriggerRule.ONE_FAILED,
text='Train is failed',
channel='#feed'
)
####
#Aufgabe, eine Nachricht an Slack zu senden
####
post_success_slack_pred = SlackAPIPostOperator(
task_id='post-success-pred-to-slack',
token=Variable.get('slack_access_token'),
text='Prediction is succeeded',
channel='#feed'
)
post_fail_slack_pred = SlackAPIPostOperator(
task_id='post-fail-pred-to-slack',
token=Variable.get('slack_access_token'),
trigger_rule=TriggerRule.ONE_FAILED,
text='Prediction is failed',
channel='#feed'
)
end = DummyOperator(task_id='end')
start >> [create_train_data, create_pred_data] >> submit_train_job \
>> [post_fail_slack_train, post_success_slack_train]
post_fail_slack_train >> end
post_success_slack_train >> deploy_model >> batch_prediction \
>> load_results \
>> [post_success_slack_pred, post_fail_slack_pred] >> end
Dies ist die erste Aufgabe, die wir in der Trainingsphase ausführen. (Roter Rahmen in der Abbildung unten) Daten werden mit DataFlow aus BigQuery extrahiert und in einen geeigneten Eimer GCS gestellt.
Konstante DATAFLOW_TRAIN_FILE
/ DATAFLOW_PRED_FILE
--Dateipfad, in dem sich die ausführbare DataFlow-Datei befindet
--Die Dateien im Verzeichnis "dags" im "Bucket", die beim Erstellen der Cloud Composer-Umgebung erstellt wurden, werden alle paar Sekunden vom Luftstromarbeiter synchronisiert.
Konstante DEFAULT_ARGS
Setzen Sie die Umgebungsvariablen beim Ausführen von DataFlow im Argument "dataflow_default_options".
--DataFlowPythonOperator
Klasse
Klasse zum Ausführen von JOB mit DataFlow als Python-Datei
Das Argument "py_file" enthält den Pfad, in dem sich die ausführbare Datei befindet.
Geben Sie im Argument options
das Argument an, das an die ausführbare Datei übergeben werden soll
Dieses Mal wird der Dateipfad zum Platzieren von Daten in GCS angegeben.
DATAFLOW_TRAIN_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'extract_train_data.py')
DATAFLOW_PRED_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'extract_pred_data.py')
DEFAULT_ARGS = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
'project_id': Variable.get('project_id'),
'dataflow_default_options': {
'project': Variable.get('project_id'),
'temp_location': 'gs://your_composer_bucket/temp',
'runner': 'DataflowRunner'
}
}
####
#Aufgabe zum Extrahieren von Daten mit DataFlow
####
#Dateipfad beim Einfügen von Daten in GCS
job_args = {
'output': 'gs://your_bucket/preprocess'
}
create_train_data = DataFlowPythonOperator(
task_id='create-train-data',
py_file=DATAFLOW_TRAIN_FILE,
options=job_args
)
create_pred_data = DataFlowPythonOperator(
task_id='create-pred-data',
py_file=DATAFLOW_PRED_FILE,
options=job_args
)
Die folgende Datei ist ein Prozess zum Aufteilen in "Zugdaten" und "Testdaten" und zum Einfügen in GCS für das Training. (Zur Vereinfachung des Artikels werde ich auch die Erläuterung der Abfrage zur Unterteilung in "Zugdaten" und "Testdaten" weglassen. Das Hashing wird unter der Annahme durchgeführt, dass eine Zeitstempelspalte vorhanden ist, und der Rest nach der Unterteilung (Dividiert durch den Wert von)
Hier sind zwei Punkte zu beachten.
In CSV-Datei konvertieren
Da Sie schließlich als CSV-Datei ausgeben möchten, müssen Sie die aus BigQuery extrahierten Daten in ein durch Kommas getrenntes Format konvertieren.
Hier haben wir eine Funktion namens to_csv
vorbereitet.
Python-Version von DataFlow
Da Python von DataFlow 3.5.x ist, kann die aus 3.6 hinzugefügte Syntax wie "f-Strings" nicht verwendet werden.
import os
import argparse
import logging
from datetime import datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import \
PipelineOptions
PROJECT = 'your_project_id'
def create_query(phase):
base_query = """
SELECT
*,
MOD(ABS(FARM_FINGERPRINT(CAST(timestamp AS STRING))), 10) AS hash_value
FROM
`dataset.your_table`
"""
if phase == 'TRAIN':
subsumple = """
hash_value < 7
"""
elif phase == 'TEST':
subsumple = """
hash_value >= 7
"""
query = """
SELECT
column1,
column2,
column3,
row_number()over() as key
FROM
({0})
WHERE {1}
""".\
format(base_query, subsumple)
return query
def to_csv(line):
csv_columns = 'column1,column2,column3,key'.split(',')
rowstring = ','.join([str(line[k]) for k in csv_columns])
return rowstring
def get_date():
jst_now = datetime.now()
dt = datetime.strftime(jst_now, "%Y-%m-%d")
return dt
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
required=True
)
known_args, pipeline_args = \
parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=options) as p:
for phase in ['TRAIN', 'TEST']:
query = create_query(phase)
date = get_date()
output_path = os.path.join(known_args.output, date,
'train', "{}".format(phase))
read = p | 'ExtractFromBigQuery_{}'.format(phase) >> beam.io.Read(
beam.io.BigQuerySource(
project=PROJECT,
query=query,
use_standard_sql=True
)
)
convert = read | 'ConvertToCSV_{}'.format(phase) >> beam.Map(to_csv)
convert | 'WriteToGCS_{}'.format(phase) >> beam.io.Write(
beam.io.WriteToText(output_path, file_name_suffix='.csv'))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Hier werden die Aufgaben des Trainings mit ML Engine und des Einsatzes des trainierten Modells nicht erläutert.
Auch dieses Mal werde ich die Erklärung der Ausführungsdatei task.py
und der in ML Engine verwendeten Modelldatei model.py
weglassen.
Diesmal bereiten wir einen Eimer für ML Engine vor. Beachten Sie daher, dass insgesamt zwei Buckets ordnungsgemäß verwendet werden, wenn sie mit dem Bucket kombiniert werden, das beim Erstellen der Cloud Composer-Umgebung erstellt wurde.
.
├── your_bucket
│ ├── code //Modell zum Lernen erforderlich.py und Aufgabe.Legen Sie eine gz-Datei mit py usw.
│ │
│ └── trainde_model //Die trainierte Modelldatei wird platziert
│
└── your_composer_bucket //Bucket erstellt beim Erstellen einer Cloud Composer-Umgebung
Konstante PACKAGE_URI
--Dateipfad, in dem sich die ausführbare Datei der ML Engine befindet
Dieses Mal wird die gz-Datei mit "Trainer.py" und "model.py" unter den obigen "gs: // your_bucket / code" abgelegt. --Diese Konstante wird im Argument "package_uris" der Klasse "MLEngineTrainingOperator" angegeben.
BashOperator
Klasse
Klasse zum Ausführen des Befehls bash
Dieses Mal wird der Befehl bash verwendet, wenn die trainierte Modelldatei bereitgestellt wird. --Das Modell wird durch Ausführen von "gcloud ml-engine version create" bereitgestellt.
(Wahrscheinlich) Sie können dasselbe mit der Klasse MLEngineVersionOperator
tun, aber diesmal habe ich den Befehl bash verwendet.
PACKAGE_URI = BUCKET + '/code/trainer-0.1.tar.gz'
OUTDIR = BUCKET + '/trained_model'
job_id = 'dev-train-{}'.\
format(datetime.datetime.now().strftime('%Y%m%d%H%M'))
job_dir = BUCKET + '/jobs/' + job_id
submit_train_job = MLEngineTrainingOperator(
task_id='train-model',
project_id=PROJECT_ID,
job_id=job_id,
package_uris=[PACKAGE_URI],
region=REGION,
training_python_module='trainer.task',
training_args=[f'--output_dir={OUTDIR}',
f'--job_dir={job_dir}',
'--dropout_rate=0.5',
'--batch_size=128',
'--train_step=1'
],
scale_tier='BASIC_GPU',
python_version='3.5'
)
today = get_date()
BASE_VERSION_NAME = 'v1_0'
VERSION_NAME = '{0}_{1}'.\
format(BASE_VERSION_NAME, datetime.datetime.now().strftime('%Y_%m_%d'))
MODEL_NAME = 'dev_model'
deploy_model = BashOperator(
task_id='deploy-model',
bash_command='gcloud ml-engine versions create '
'{{ params.version_name}} '
'--model {{ params.model_name }} '
'--origin $(gsutil ls gs://your_bucket/trained_model/export/exporter | tail -1) '
'--python-version="3.5" '
'--runtime-version=1.14 ',
params={'version_name': VERSION_NAME,
'model_name': MODEL_NAME}
)
Hier erklären wir die Aufgabe der Stapelvorhersage anhand des zuvor bereitgestellten Modells.
Konstante input_path
Der Pfad der CSV-Datei, die für die zuvor gemachte Vorhersage extrahiert und in GCS abgelegt wurde
Konstanter Ausgabepfad
--GCS-Dateipfad, um das vorhergesagte Ergebnis zu setzen
input_path = BUCKET + f'/preprocess/{today}/prediction/prediction-*'
output_path = BUCKET + f'/result/{today}/'
batch_prediction = MLEngineBatchPredictionOperator(
task_id='batch-prediction',
data_format='TEXT',
region=REGION,
job_id=job_id,
input_paths=input_path,
output_path=output_path,
model_name=MODEL_NAME,
version_name=VERSION_NAME
)
In diesem Abschnitt wird die Aufgabe beschrieben, das zuvor vorhergesagte Vorhersageergebnis mithilfe von DataFlow in BigQuery zu laden.
Bei Dag-Dateien ähnelt es der eingangs beschriebenen Aufgabe "Daten aus BigQuery extrahieren".
Die Konstante DATAFLOW_LOAD_FILE
gibt den GCS-Dateipfad an, in dem sich die ausführbare DataFlow-Datei befindet.
DATAFLOW_LOAD_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'load.py')
load_results = DataFlowPythonOperator(
task_id='load_pred_results',
py_file=DATAFLOW_LOAD_FILE
)
In der folgenden Datei wird die in GCS platzierte Datei gelesen, in das Json-Format konvertiert und in eine entsprechende BigQuery-Tabelle geladen. Worauf sollten Sie hier achten?
{"key": [0], "prediction: [3.45...]"}
import logging
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
BUCKET_NAME = 'your_bucket'
INPUT = 'gs://{}/result/prediction.results-*'.format(BUCKET_NAME)
def convert(line):
import json
record = json.loads(line)
return {'key': record['key'][0], 'predictions': record['predictions'][0]}
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = \
parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=options) as p:
dataset = 'your_dataset.results'
read = p | 'ReadPredictionResult' >> beam.io.ReadFromText(INPUT)
json = read | 'ConvertJson' >> beam.Map(convert)
json | 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink(
dataset,
schema='key:INTEGER, predictions:FLOAT',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Normalerweise verwende ich AWS, aber da ich die Möglichkeit hatte, auf GCP-Dienste einzugehen, habe ich ML-bezogene Dienste zusammengefasst. Ich hoffe, es ist hilfreich für diejenigen, die sich Sorgen um den ML-Workflow machen.
Es gibt einen Ort, an dem das mit ML Engine trainierte Modell bereitgestellt werden kann, dies wird jedoch nicht empfohlen. Wir empfehlen, dass Sie einen Mechanismus erstellen, um die Genauigkeit des erlernten Modells zu messen / zu vergleichen und ihn nach dem Einfügen der Aufgabe bereitzustellen.
Recommended Posts