[GO] Run a machine learning pipeline with Cloud Dataflow (Python)

On GCP, Cloud Dataflow is often used in pipelines for ETL applications, but as a reminder, I tried to use it not only for preprocessing but also for pipeline execution including machine learning.

Thing you want to do

I want to be able to easily execute a large number of learning / prediction scripts that were moved on a small scale at hand when a large amount of learning trial and error is performed in a distributed environment. It was troublesome to modify the learning / prediction script that I was running at hand so that it would work remotely every time, and to set up a machine and disperse it. I also want to be able to move from pre-processing such as attribute generation to learning and evaluation in one stroke. If the code of the preprocessing and learning part is divided, the prediction model cannot be reproduced unless the code and intermediate data versions are carefully managed, and if it is implemented as a pipeline, it can be incorporated into the system. Because it seems to be easy.

Installation

Install the following on a working machine that runs the pipeline at hand or submits a job to the cloud. (Note that Python only supports 2 systems at the moment when running in the cloud)

python


git clone https://github.com/apache/beam.git
cd beam/sdks/python/
python setup.py sdist
cd dist/
pip install apache-beam-sdk-*.tar.gz

python


pip install --upgrade google-cloud-dataflow

I have 0.6.0 and 0.5.5 installed in my environment, respectively. After that, install libraries such as scikit-learn and pandas that are necessary to run in your environment.

Run scikit-learn learning / prediction pipeline

Here, let's consider the following assumption learning / prediction pipeline using pandas and scikit-learn that are already installed in the Dataflow execution environment.

Here, the data is created in advance for both learning and evaluation and put in BigQuery, and it is assumed that the hyperparameters have been decided and you want to evaluate a large number of prediction models at once. It is assumed that the learning model will be re-learned every year in order to deal with the deterioration of the learning model over time.

The explanation will proceed using the data acquired by the following query as an example.

python


SELECT year,date,shop_id,sales,attr1,attr2,attr3
FROM dataset.table

It is assumed that shop_id is the unique key of the store, sales is the objective variable, and attr1-3 is the attribute.

option

Below, we will enter the setting items of Pipeline.

option setting


import apache_beam as beam
import apache_beam.transforms.window as window

options = beam.utils.pipeline_options.PipelineOptions()

google_cloud_options = options.view_as(beam.utils.pipeline_options.GoogleCloudOptions)
google_cloud_options.project = '{YOUR_PROJECT}'
google_cloud_options.job_name = 'sklearn'
google_cloud_options.staging_location = 'gs://{YOUR_BUCKET}/binaries'
google_cloud_options.temp_location = 'gs://{YOUR_BUCKET}/temp'

worker_options = options.view_as(beam.utils.pipeline_options.WorkerOptions)
worker_options.max_num_workers = 10

#options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DirectRunner'
options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DataflowRunner'

pipeline = beam.Pipeline(options=options)

In Google Cloud Options, we will describe the settings to run on GCP. Specify the executable file or temporary file storage location with staging_location or temp_location.

Worker Options sets the worker. By default, GCP will automatically determine the configuration according to your load. (The Japanese version document states that Python is not supported, but the English version states that it is supported) Even when auto scale is on, you can limit the scale by specifying the maximum number of workers with max_num_worker.

Standard Options specifies the environment in which the pipeline runs. If you specify DirectRunner, it will run in your environment, and if you set it to DataflowRunner, it will run on GCP. It seems good to check the operation of a small workload at hand and run it on the cloud if there is no problem.

There are many other option settings, check the command line help and Source Comments. I can do it.

Pipeline definition

The pipeline is described by connecting each process in order with a pipe operator.

Pipeline


(pipeline
 | "Query data"  >> beam.Read(beam.io.BigQuerySource(query=query))
 | "Assign time" >> beam.Map(assign_timevalue)
 | "Set window"  >> beam.WindowInto(window.SlidingWindows(size=3, period=1))
 | "Assign group key" >> beam.Map(lambda v: (v["shop_id"], v))
 | "Group by group key and time window" >> beam.GroupByKey()
 | "Learn and predict"  >> beam.FlatMap(learn_predict)
 | "Write predict data" >> beam.Write(beam.io.BigQuerySink('dataset.table',
                              schema="shop_id:STRING, date:STRING, predict:FLOAT, sales:INTEGER",
                              write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                              create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)))

pipeline.run()

In the first step, we are reading data by specifying a query from BigQuery. The second step specifies the reference value that determines the window range of the next step. This time, the data is divided by year, so specify the column that indicates the year of each data (details will be described later). The third step specifies the width and spacing of the windows. This time, the width is 3 years (2 years learning, 1 year forecast), and it is shifted by 1 year, so set size = 3, period = 1. Sliding Windows is a window for sliding, but there are many others such as Fixed Windows for fixing and Sessions for sessions. The store ID is specified as the Key that you want to specify the group in the 4th step. It will be grouped by the window and key (store) specified earlier in the 5th step. The sixth step performs learning / prediction processing for each grouped data and returns the prediction result. The reason for using FlatMap is that the data aggregated for each store x window is re-distributed and returned on a daily basis. In the 7th step, the daily forecast results are saved in BigQuery. The pipeline is executed when the pipeline in the last stage is run.

Next, let's take a look inside each function.

A function that returns a value for splitting a window


def assign_timevalue(v):

    import apache_beam.transforms.window as window

    return window.TimestampedValue(v, v["year"])

To specify the value to use in the window, replace the value with TimestampedValue. The first TimestampedValue is the value, and the second is the value used in the window. The caveat here is that you need to specify import to reference a package or module within a function. There is no problem if you move it at hand, but on the cloud, this function is distributed and executed by worker nodes. You need to import the package so that it works even in the environment of the worker node. Please note that globally defined variables cannot be accessed on the cloud.

Function that makes learning prediction


def learn_predict(records):

    import pandas as pd
    from sklearn.ensemble import GradientBoostingRegressor

    target_attr = 'sales'
    learn_attrs = ['attr1', 'attr2', 'attr3']

    data = pd.DataFrame(records[1])
    data[learn_attrs] = data[learn_attrs].apply(pd.to_numeric)
    data = data.fillna(0.)

    if len(data["year"].unique()) < 3:
        return [] #Do nothing for combinations less than 3 years

    year_max = data["year"].max()
    train = data[data["year"] <  year_max] #2 years ago for learning
    test  = data[data["year"] == year_max] #The next year is for forecast evaluation

    model = GradientBoostingRegressor()
    model.fit(train[learn_attrs], train[target_attr])
    test.loc[:, "predict"] = model.predict(test[learn_attrs])

    return test[["shop_id","date","predict","sales"]].to_dict(orient='records')

Since the data is passed to the learning / prediction function as a tuple whose values are the key (store ID) and the dictionary format list, the value list is converted into a Dataframe for learning / prediction. The last line is a conversion to pass the result as a dictionary format list to the BigQuery insert in the latter part.

When you execute the pipeline in this way, the prediction result and correct answer data will be entered in BigQuery, so you can calculate and evaluate indicators such as RMSE from various perspectives such as store and year by SQL.

at the end

Running the learning process on Dataflow may be a bad idea from the purpose of the service, but I was able to move it. This time it was a simple one-way pipeline that learns and predicts from the data created in BigQuery and saves the result, but you can add data processing etc., pass the evaluation data from another flow, and the result It seems that it is possible to flexibly change by branching the prediction result and the prediction model and outputting it to the subsequent stage. It was assumed that hyperparameters have been decided this time, but I would like to try mass parallel execution of parameter tuning.

Cloud Dataflow is a service that hasn't received much attention in GCP yet, but personally, Dataflow is fully managed to build and operate dataflow, which tends to be troublesome for applications that handle complex data processing such as machine learning. Expects to be like App Engine for data analysis applications.

This time I used scikit-learn which is installed by default in Dataflow, but in reality you will want to use various libraries. Next time, I would like to describe the procedure for installing any library using the installation of xgboost as an example.

reference

Recommended Posts

Run a machine learning pipeline with Cloud Dataflow (Python)
Run XGBoost with Cloud Dataflow (Python)
Build a Python machine learning environment with a container
Machine learning with Python! Preparation
Beginning with Python machine learning
Build a machine learning application development environment with Python
Machine learning with python (1) Overall classification
"Scraping & machine learning with Python" Learning memo
Create a python machine learning model relearning mechanism with mlflow
Until you create a machine learning environment with Python on Windows 7 and run it
Run Cloud Dataflow (Python) from App Engine
Amplify images for machine learning with python
Building a Windows 7 environment for getting started with machine learning with Python
Machine learning with python (2) Simple regression analysis
Build a detonation velocity website with Cloud Run and Python (Flask)
A story about machine learning with Kyasuket
[Shakyo] Encounter with Python for machine learning
Run a Python web application with Docker
Build AI / machine learning environment with Python
A beginner of machine learning tried to predict Arima Kinen with python
[Python] Easy introduction to machine learning with python (SVM)
Machine learning starting with Python Personal memorandum Part2
Machine learning starting with Python Personal memorandum Part1
[Python] Collect images with Icrawler for machine learning [1000 images]
Get a glimpse of machine learning in Python
I started machine learning with Python Data preprocessing
Learning Python with ChemTHEATER 03
Run Python with VBA
Learning Python with ChemTHEATER 05-1
Run prepDE.py with python3
Run Blender with python
Learning Python with ChemTHEATER 02
Learning Python with ChemTHEATER 01
Cloud Run tutorial (python)
Run iperf with python
Machine learning beginners tried to make a horse racing prediction model with python
Build a machine learning Python environment on Mac OS
Let's feel like a material researcher with machine learning
[Python] I made a classifier for irises [Machine learning]
Summary of the basic flow of machine learning with Python
Run a Python file with relative import in PyCharm
Memo for building a machine learning environment using Python
I want to run a quantum computer with Python
Create a machine learning environment from scratch with Winsows 10
MALSS, a tool that supports machine learning in Python
I tried to make a real-time sound source separation mock with Python machine learning
Predicting the goal time of a full marathon with machine learning-③: Visualizing data with Python-
Machine learning learned with Pokemon
Run python with PyCharm (Windows)
Run Python with CloudFlash (arm926ej-s)
Let's run Excel with Python
Reinforcement learning starting with Python
Make a fortune with Python
Machine learning Minesweeper with PyTorch
Build a machine learning environment
Run Label with tkinter [Python]
Python Machine Learning Programming> Keywords
Python Iteration Learning with Cheminformatics
Create a directory with python
Try machine learning with Kaggle
Run DHT22 with RasPi + Python