[PYTHON] How to query BigQuery with Kubeflow Pipelines and save the result and notes

Introduction

I will summarize how to send a query from Kubeflow Pipelines to BigQuery and save the query result in the following 3 patterns.

** 1. CSV file ** 2. GCS 3. BigQuery

At the same time, I will write what I came up with for the notes on implementation.

environment

import sys
sys.version
"""
'3.7.7 (default, May  6 2020, 04:59:01) \n[Clang 4.0.1 (tags/RELEASE_401/final)]'
"""

import kfp
kfp.__version__
"""
'1.0.0'
"""

As of January 2021, the latest version of kfp, the Python SDK for Kubeflow Pipelines, is 1.3.0, but this is because 1.0.0 was installed in my execution environment (AI Platform Pipelines). I'm using a version.

About the base image

The KFP component that throws a query to BigQuery has existed since around July 2020, but since python2.7 was used for the base image, ** an encoding error ** appears when the query statement contains Japanese. I did.

The base image was updated to python3.7 in Just the other day P-R merge, so even if the query contains Japanese, the query can be processed correctly.

In other words, as of January 2021, if Japanese is included in the query, if you do not specify the following component URL, ** a component using a python2 system image will be specified and it will fail with an encoding error **, so be careful. Is required. 'https://raw.githubusercontent.com/kubeflow/pipelines/ Use the one here 1.3.0/components/gcp/bigquery/query/...'

Preparation

It is assumed that the sample shown in this article has the following declaration.

import kfp
from kfp import dsl
from kfp import components as comp
from kfp.components import func_to_container_op
from kfp.components import InputPath

HOST = 'URL of Kubeflow Pipelines'
PROJECT_ID = 'If you are using GCP, run the Project Id'
QUERY = '''
SELECT
    * 
FROM
    `bigquery-public-data.stackoverflow.posts_questions` 
LIMIT
    10
--This is a test
'''

This is all the execution.

result = kfp.Client(host=HOST).create_run_from_pipeline_func(pipeline, arguments={})
result
"""
Experiment link here
Run link here
RunPipelineResult(run_id=ee82166c-707b-4e5f-84d2-5d98d7189023)
"""

Save to CSV file

code

Declare the file name and component to save.

#CSV file name
FILENAME = 'query_result.csv'

#BigQuery to CSV component URL
bigquery_op_url = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.3.0/components/gcp/bigquery/query/to_CSV/component.yaml'
bigquery_query_op = comp.load_component_from_url(bigquery_op_url)
help(bigquery_query_op)
"""
Help on function Bigquery - Query:

Bigquery - Query(query: str, project_id: 'GCPProjectID', job_config: dict = '', output_filename: str = 'bq_results.csv')
    Bigquery - Query
    A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery and
    store the results to a csv file.
"""

If you use the help function, you will know the arguments to be passed to that component, so set the arguments while looking here.

Check that it was output to CSV by the following two steps. ** task 1. Check the output path ** ** task 2. Read CSV from output destination path and output shape **

# task 1
@func_to_container_op
def print_op(text: InputPath('CSV')) -> None:
    print(f"text: {text}")
    print(f"type: {type(text)}")

# task 2
@func_to_container_op
def handle_csv_op(path: InputPath('CSV')) -> None:
    print(f'path: {path}')
    print(f'type: {type(path)}')
    
    import subprocess
    subprocess.run(['pip', 'install', 'pandas'])
    import pandas as pd

    df = pd.read_csv(path)
    print(f'shape: {df.shape}')

#bonus
@func_to_container_op
def print_op_non_type(text) -> None:
    print(f"text: {text}")
    print(f"type: {type(text)}")

# pipeline
@dsl.pipeline(
    name='Bigquery query pipeline name',
    description='Bigquery query pipeline'
)
def pipeline():
    bq_task = bigquery_query_op(
        query=QUERY,
        project_id=PROJECT_ID,
        output_filename=FILENAME)
    print_op(bq_task.outputs['table']) # task 1
    handle_csv_op(f"{bq_task.outputs['table']}/{FILENAME}") # task 2
    print_op_non_type(bq_task.outputs['table']) #bonus

Execution result

# print_op log
text: /tmp/inputs/text/data
type: <class 'str'>

# handle_csv_op log
path: /tmp/inputs/path/data
type: <class 'str'>
shape: (10, 20)

# print_op_non_type log
text: ,id,title,body,accepted_answer_id,answer_count,comment_count,community_owned_date,creation_date,favorite_count,last_activity_date,last_edit_date,last_editor_display_name,last_editor_user_id,owner_display_name,owner_user_id,parent_id,post_type_id,score,tags,view_count
0,65070674,NewRelic APM cpu usage shows incorrect values in comparison to K8S cluster cpu chart,"<p>Here goes charts of CPU usage of same pod. <strong>chart 1</strong> is from k8s cluster, <strong>chart 2</strong> is from APM.</p>
<ol></ol>"
...
type: <class 'str'>

From the execution result log, you can see the following.

--The path received by InputPath ('CSV') will be something like / tmp/inputs/variable name/data --The file name specified by the argument is not displayed in the component output (bq_task.outputs ['table'])

# print_op log
text: /tmp/inputs/text/data

# handle_csv_op log
#F as an argument"{bq_task.outputs['table']}/{FILENAME}"Is passed, but FILENAME is not output
path: /tmp/inputs/path/data

--When passing the query result to the next component, if you do not specify the argument type with InputPath ('CSV'), the query result will be passed as a string.

# print_op_non_type log
text: ,id,title,body,accepted_answer_id,answer_count,comment_count,community_owned_date,creation_date,favorite_count,last_activity_date,last_edit_date,last_editor_display_name,last_editor_user_id,owner_display_name,owner_user_id,parent_id,post_type_id,score,tags,view_count
0,65070674,NewRelic APM cpu usage shows incorrect values in comparison to K8S cluster cpu chart,"<p>Here goes charts of CPU usage of same pod. <strong>chart 1</strong> is from k8s cluster, <strong>chart 2</strong> is from APM.</p>
<ol></ol>"

...Omission
type: <class 'str'>

important point

Part 1

When passing the query result as a character string ** If the argument type of the destination component is set to str, it will be dropped due to a type mismatch **, so the output of the component will be output in a form other thanInputPath ('xxx'). Handing over seems to be deprecated.

...Abbreviation

#Specify the argument type as str
@func_to_container_op
def print_op(text:str) -> None:
    print(f"text: {text}")
    print(f"type: {type(text)}")

def pipeline():
    bq_task = bigquery_query_op(
        query=QUERY,
        project_id=PROJECT_ID,
        output_filename=FILENAME)
    #Since the output of the component is PipelineParam type and the argument is str, the following tasks will fail due to the argument type mismatch.
    print_op(bq_task.outputs['table']) # task 1

Part 2

As mentioned above, the output of the component (bq_task.outputs ['table']) is a placeholder called PipelineParam type, so it cannot be concatenated with a character string or operated.

Therefore, in the above program, the assignment was done with f-string.

def pipeline():
    bq_task = bigquery_query_op(
        query=QUERY,
        project_id=PROJECT_ID,
        output_filename=FILENAME)
    #The following method fails because PipelineParam type cannot be cast to string
    # print_op(bq_task.outputs['table'] + "/" + FILENAME) # task 1
    #This passes
    print_op(f"{bq_task.outputs['table']}/{FILENAME}") # task 1

Care should be taken when handling the output of the component, as the values ​​are actually assigned at runtime in the pipeline.

Reference: Kubeflow --Pipeline Parameters

Save to GCS

code

Declare the file name and component to save. As you can see from the output of the help function, it requires different arguments than when saving a CSV file.

#Path to the file to save to GCS
BUCKET = 'Bucket name'
GCS_PATH = f'gs://{BUCKET}/query_from_kfp/query_result.csv'

bigquery_op_url = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.3.0/components/gcp/bigquery/query/to_gcs/component.yaml'
bigquery_query_op = comp.load_component_from_url(bigquery_op_url)
help(bigquery_query_op)
"""
Help on function Bigquery - Query:

Bigquery - Query(query: str, project_id: 'GCPProjectID', dataset_id: str = '', table_id: str = '', output_gcs_path: 'GCSPath' = '', dataset_location: str = 'US', job_config: dict = '', output_kfp_path: str = '')
    Bigquery - Query
    A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery 
    service and dump outputs to a Google Cloud Storage blob.
"""

Confirm that it was output to GCS by the following two steps as before. ** task 1. Check the output destination GCS path ** ** task 2. Read CSV from the output destination GCS path and output shape **

# task 1
@func_to_container_op
def print_op(text: InputPath('GCSPath')) -> None:
    print(f"text: {text}")
    print(f"type: {type(text)}")

# task 2
@func_to_container_op
def handle_csv_op(gcs_file_path: InputPath('GCSPath'), project:str) -> None:
    print(f'path: {gcs_file_path}')
    print(f'type: {type(gcs_file_path)}')
    
    import subprocess
    subprocess.run(['pip', 'install', 'google-cloud-storage', 'pandas'])

    from google.cloud import storage
    from io import BytesIO
    import pandas as pd

    client = storage.Client(project)
    # point 1
    with open(gcs_file_path, 'r') as f:
        path = f.read()
    # point 2
    with BytesIO() as f:
        client.download_blob_to_file(path, f)
        content = f.getvalue()
    df = pd.read_csv(BytesIO(content))
    print(f'shape: {df.shape}')

# pipeline
@dsl.pipeline(
    name='Bigquery query pipeline name',
    description='Bigquery query pipeline'
)
def pipeline():
    bq_task = bigquery_query_op(
        query=QUERY,
        project_id=PROJECT_ID,
        output_gcs_path=GCS_PATH})
    print_op(bq_task.outputs['output_gcs_path']) # task 1
    handle_task = handle_csv_op(gcs=bq_task.outputs['output_gcs_path'],
                                project=PROJECT_ID) # task 2

Execution result

# print_op log
text: /tmp/inputs/text/data
type: <class 'str'>

# handle_csv_op log
path: /tmp/inputs/gcs/data
type: <class 'str'>
shape: (10, 20)

important point

Part 1

I feel that the processing habit of the handle_csv_op component is strong. In this case, the query results are stored in GCS, so the output from the bigquery_query_op component is not a str type path, but a ** path to the file where the GCS path is written **.

Therefore, after reading the GCS path as shown below,

# point 1
with open(gcs_file_path, 'r') as f:
    path = f.read() # gs://{BUCKET}/query_from_kfp/query_result.csv

Get the contents of the file from GCS as follows:

# point 2
with BytesIO() as f:
    client.download_blob_to_file(path, f)
    content = f.getvalue()
df = pd.read_csv(BytesIO(content))

This behavior is due to the output_gcs_path being defined as the OutputPath type in the Component Definition File (https://github.com/kubeflow/pipelines/blob/eeb7f8f04ac50351fd578a583a8ddc7df1e00bdd/components/gcp/bigquery/query/to_gcs/component.yaml#L79). I'd like you to make it a string obediently ... but the reason is a mystery.

Part 2

If you specify InputPath as the component argument type, ** the specific string is excluded from the argument name **.

For example, there is gcs_file_path in the argument of the handle_csv_op component, but when referring to it, it is referred to as gcs.

# gcs_file_path=bq_task.outputs['output_gcs_path']is not
handle_task = handle_csv_op(gcs=bq_task.outputs['output_gcs_path'],
                            project=PROJECT_ID) # task 2

Document as below? There is, but it's hard to find, so I'm addicted to it. It's hard to find Kubeflow documentation and tutorials scattered all over the place.

Reference: Building Python function-based components --passing parameters by value

Save to BigQuery

Submit a query to BigQuery and write the result to any BigQuery table.

code

Declare the table and components to save to.

#Where to save the query results
DATASET_ID = 'mail_retention_pipeline'
TABLE_ID = 'query_result'
FILENAME = 'query_result.csv'

#Query for checking query results
VERIFY_QUERY = f'''
SELECT
    * 
FROM
    `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
'''

#A component that saves query results in BigQuery
bigquery_table_op_url = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.3.0/components/gcp/bigquery/query/to_table/component.yaml'
bigquery_query_table_op = comp.load_component_from_url(bigquery_table_op_url)

#Component that outputs query results to CSV
bigquery_csv_op_url = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.3.0/components/gcp/bigquery/query/to_CSV/component.yaml'
bigquery_query_csv_op = comp.load_component_from_url(bigquery_csv_op_url)

help(bigquery_query_table_op)
"""
Help on function Bigquery - Query:

Bigquery - Query(query: str, project_id: 'GCPProjectID', table: str, dataset_id: str = '', table_id: str = '', dataset_location: str = 'US', job_config: dict = '')
    Bigquery - Query
    A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery 
    service and dump outputs to new table.
"""

Confirm that it is output to BigQuery by the following three steps. ** task 1. Submit a query to BigQuery and save the result in BigQuery ** ** task 2. Get query results from BigQuery and save as CSV ** ** task 3. Read the CSV file and check the shape **

# task 3
@func_to_container_op
def handle_csv_op(path: InputPath('CSV')) -> None:
    import subprocess
    subprocess.run(['pip', 'install', 'pandas'])
    import pandas as pd
    df = pd.read_csv(path)
    print(f'shape: {df.shape}')

@dsl.pipeline(
    name='Bigquery query pipeline name',
    description='Bigquery query pipeline'
)
def pipeline():
    # task 1:Save query results to BigQuery
    bq_table_task = bigquery_query_table_op(
        query=QUERY,
        project_id=PROJECT_ID,
        dataset_id=DATASET_ID,
        table_id=TABLE_ID,
        table='')
    # task 2:Save query results as CSV
    bq_csv_task = bigquery_query_csv_op(
        query=VERIFY_QUERY,
        project_id=PROJECT_ID,
        output_filename=FILENAME).after(bq_table_task)
    handle_task = handle_csv_op(f"{bq_csv_task.outputs['table']}/{FILENAME}") # task 3

Execution result

# handle_csv_op log
path: /tmp/inputs/gcs/data
type: <class 'str'>
shape: (10, 20)

The DAG has a pipeline-like shape. image.png

important point

Part 1

The bq_table_task component has a mysterious argument called table, which will not work unless you put some string in it. As far as the source code is seen, this parameter is not used, so it seems that the correction is omitted.

#A component that saves query results in BigQuery
bigquery_table_op_url = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.3.0/components/gcp/bigquery/query/to_table/component.yaml'
bigquery_query_table_op = comp.load_component_from_url(bigquery_table_op_url)
...
#There is a positional argument called table
help(bigquery_query_table_op)
"""
Help on function Bigquery - Query:

Bigquery - Query(query: str, project_id: 'GCPProjectID', table: str, dataset_id: str = '', table_id: str = '', dataset_location: str = 'US', job_config: dict = '')
    Bigquery - Query
    A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery 
    service and dump outputs to new table.
"""

When I checked the Kubeflow Pipelines repository, I found Modified PR, so if it is merged, this problem will be solved.

Part 2

The process of saving query results in BigQuery can actually be achieved with a component that saves query results in GCS. As you can see from the output of help, the component that stores the query results in GCS also has the arguments dataset_id and table_id.

#A component that saves query results in BigQuery
"""
Help on function Bigquery - Query:

Bigquery - Query(query: str, project_id: 'GCPProjectID', table: str, dataset_id: str = '', table_id: str = '', dataset_location: str = 'US', job_config: dict = '')
    Bigquery - Query
    A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery 
    service and dump outputs to new table.
"""

#A component that saves query results in GCS
"""
Help on function Bigquery - Query:

Bigquery - Query(query: str, project_id: 'GCPProjectID', dataset_id: str = '', table_id: str = '', output_gcs_path: 'GCSPath' = '', dataset_location: str = 'US', job_config: dict = '', output_kfp_path: str = '')
    Bigquery - Query
    A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery 
    service and dump outputs to a Google Cloud Storage blob.
"""

In other words, except in the situation where "query results are used only in the pipeline and do not need to be saved anywhere", it is OK to use a component that saves query results in GCS.

Summary

--Be careful of the version when using the official component that queries BigQuery --Japanese cannot be used for query comments if it is old --Note the input and output types of the component --There is a string to be omitted in the keyword argument of the component --Use components that store in GCS unless you want to complete the query results in the pipeline

that's all.

Recommended Posts

How to query BigQuery with Kubeflow Pipelines and save the result and notes
How to do Bulk Update with PyMySQL and notes [Python]
How to split and save a DataFrame
How to work with BigQuery in Python
How to get the date and time difference in seconds with python
How to get started with the 2020 Python project (windows wsl and mac standardization)
How to pass the path to the library built with pyenv and virtualenv in PyCharm
How to specify the NIC to scan with amazon-dash
How to get the key on Amazon S3 with Boto 3, implementation example, notes
I tried to save the data with discord
How to try the friends-of-friends algorithm with pyfof
[Python] How to save images on the Web at once with Beautiful Soup
Save the object to a file with pickle
How to Learn Kaldi with the JUST Corpus
[Note] How to write QR code and description in the same image with python
How to change the behavior when loading / dumping yaml with PyYAML and its details
It is easy to execute SQL with Python and output the result in Excel
Save the text of all Evernote notes to SQLite using Beautiful Soup and SQLAlchemy
How to delete the specified string with the sed command! !! !!
[Introduction to Python] How to iterate with the range function?
How to create a submenu with the [Blender] plugin
[Python] How to specify the download location with youtube-dl
How to share folders with Docker and Windows with tensorflow
I tried to automatically read and save with VOICEROID2
How to extract null values and non-null values with pandas
How to use the grep command and frequent samples
How to loop and play gif video with openCV
Autoencoder with Chainer (Notes on how to use + trainer)
How to use argparse and the difference between optparse
[Python] How to rewrite the table style with python-pptx [python-pptx]
[TF] How to save and load Tensorflow learning parameters
How to interactively draw a machine learning pipeline with scikit-learn and save it in HTML
POST the image selected on the website with multipart / form-data and save it to Amazon S3! !!
How to know the number of GPUs from python ~ Notes on using multiprocessing with pytorch ~
How to insert a specific process at the start and end of spider with scrapy
I tried to simulate how the infection spreads with Python
How to return to the previous directory with the Bash cd command
[Hyperledger Iroha] Notes on how to use the Python SDK
[Python] How to play with class variables with decorator and metaclass
[pyqtgraph] Add region to the graph and link it with the graph region
How to manipulate the DOM in an iframe with Selenium
How to generate a query using the IN operator in Django
How to get all the keys and values in the dictionary
A story about how to deal with the CORS problem
[TF] How to load / save Model and Parameter in Keras
How to get into the python development environment with Vagrant
Save images on the web to Drive with Python (Colab)
Notes on how to use marshmallow in the schema library
How to create dataframes and mess with elements in pandas
[Introduction to Python] How to get data with the listdir function
How to log in to AtCoder with Python and submit automatically
[Django Learned with the Devil's Blade] How to get a query set for forward / reverse reference
[Python] What is pip? Explain the command list and how to use it with actual examples
How to display the CPU usage, pod name, and IP address of a pod created with Kubernetes
I was surprised at how to save objects with python, which is lean and very energy-saving.
Connect to BigQuery with Python
How to use the generator
How to update with SQLAlchemy?
How to cast with Theano
How to Alter with SQLAlchemy?
How to separate strings with','