I will summarize how to send a query from Kubeflow Pipelines to BigQuery and save the query result in the following 3 patterns.
** 1. CSV file ** 2. GCS 3. BigQuery
At the same time, I will write what I came up with for the notes on implementation.
import sys
sys.version
"""
'3.7.7 (default, May 6 2020, 04:59:01) \n[Clang 4.0.1 (tags/RELEASE_401/final)]'
"""
import kfp
kfp.__version__
"""
'1.0.0'
"""
As of January 2021, the latest version of kfp, the Python SDK for Kubeflow Pipelines, is 1.3.0
, but this is because 1.0.0
was installed in my execution environment (AI Platform Pipelines). I'm using a version.
The KFP component that throws a query to BigQuery has existed since around July 2020, but since python2.7 was used for the base image, ** an encoding error ** appears when the query statement contains Japanese. I did.
The base image was updated to python3.7 in Just the other day P-R merge, so even if the query contains Japanese, the query can be processed correctly.
In other words, as of January 2021, if Japanese is included in the query, if you do not specify the following component URL, ** a component using a python2 system image will be specified and it will fail with an encoding error **, so be careful. Is required.
'https://raw.githubusercontent.com/kubeflow/pipelines/ Use the one here 1.3.0/components/gcp/bigquery/query/...'
It is assumed that the sample shown in this article has the following declaration.
import kfp
from kfp import dsl
from kfp import components as comp
from kfp.components import func_to_container_op
from kfp.components import InputPath
HOST = 'URL of Kubeflow Pipelines'
PROJECT_ID = 'If you are using GCP, run the Project Id'
QUERY = '''
SELECT
*
FROM
`bigquery-public-data.stackoverflow.posts_questions`
LIMIT
10
--This is a test
'''
This is all the execution.
result = kfp.Client(host=HOST).create_run_from_pipeline_func(pipeline, arguments={})
result
"""
Experiment link here
Run link here
RunPipelineResult(run_id=ee82166c-707b-4e5f-84d2-5d98d7189023)
"""
Declare the file name and component to save.
#CSV file name
FILENAME = 'query_result.csv'
#BigQuery to CSV component URL
bigquery_op_url = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.3.0/components/gcp/bigquery/query/to_CSV/component.yaml'
bigquery_query_op = comp.load_component_from_url(bigquery_op_url)
help(bigquery_query_op)
"""
Help on function Bigquery - Query:
Bigquery - Query(query: str, project_id: 'GCPProjectID', job_config: dict = '', output_filename: str = 'bq_results.csv')
Bigquery - Query
A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery and
store the results to a csv file.
"""
If you use the help
function, you will know the arguments to be passed to that component, so set the arguments while looking here.
Check that it was output to CSV by the following two steps. ** task 1. Check the output path ** ** task 2. Read CSV from output destination path and output shape **
# task 1
@func_to_container_op
def print_op(text: InputPath('CSV')) -> None:
print(f"text: {text}")
print(f"type: {type(text)}")
# task 2
@func_to_container_op
def handle_csv_op(path: InputPath('CSV')) -> None:
print(f'path: {path}')
print(f'type: {type(path)}')
import subprocess
subprocess.run(['pip', 'install', 'pandas'])
import pandas as pd
df = pd.read_csv(path)
print(f'shape: {df.shape}')
#bonus
@func_to_container_op
def print_op_non_type(text) -> None:
print(f"text: {text}")
print(f"type: {type(text)}")
# pipeline
@dsl.pipeline(
name='Bigquery query pipeline name',
description='Bigquery query pipeline'
)
def pipeline():
bq_task = bigquery_query_op(
query=QUERY,
project_id=PROJECT_ID,
output_filename=FILENAME)
print_op(bq_task.outputs['table']) # task 1
handle_csv_op(f"{bq_task.outputs['table']}/{FILENAME}") # task 2
print_op_non_type(bq_task.outputs['table']) #bonus
# print_op log
text: /tmp/inputs/text/data
type: <class 'str'>
# handle_csv_op log
path: /tmp/inputs/path/data
type: <class 'str'>
shape: (10, 20)
# print_op_non_type log
text: ,id,title,body,accepted_answer_id,answer_count,comment_count,community_owned_date,creation_date,favorite_count,last_activity_date,last_edit_date,last_editor_display_name,last_editor_user_id,owner_display_name,owner_user_id,parent_id,post_type_id,score,tags,view_count
0,65070674,NewRelic APM cpu usage shows incorrect values in comparison to K8S cluster cpu chart,"<p>Here goes charts of CPU usage of same pod. <strong>chart 1</strong> is from k8s cluster, <strong>chart 2</strong> is from APM.</p>
<ol></ol>"
...
type: <class 'str'>
From the execution result log, you can see the following.
--The path received by InputPath ('CSV')
will be something like / tmp/inputs/variable name/data
--The file name specified by the argument is not displayed in the component output (bq_task.outputs ['table']
)
# print_op log
text: /tmp/inputs/text/data
# handle_csv_op log
#F as an argument"{bq_task.outputs['table']}/{FILENAME}"Is passed, but FILENAME is not output
path: /tmp/inputs/path/data
--When passing the query result to the next component, if you do not specify the argument type with InputPath ('CSV')
, the query result will be passed as a string.
# print_op_non_type log
text: ,id,title,body,accepted_answer_id,answer_count,comment_count,community_owned_date,creation_date,favorite_count,last_activity_date,last_edit_date,last_editor_display_name,last_editor_user_id,owner_display_name,owner_user_id,parent_id,post_type_id,score,tags,view_count
0,65070674,NewRelic APM cpu usage shows incorrect values in comparison to K8S cluster cpu chart,"<p>Here goes charts of CPU usage of same pod. <strong>chart 1</strong> is from k8s cluster, <strong>chart 2</strong> is from APM.</p>
<ol></ol>"
...Omission
type: <class 'str'>
When passing the query result as a character string ** If the argument type of the destination component is set to str
, it will be dropped due to a type mismatch **, so the output of the component will be output in a form other thanInputPath ('xxx')
. Handing over seems to be deprecated.
...Abbreviation
#Specify the argument type as str
@func_to_container_op
def print_op(text:str) -> None:
print(f"text: {text}")
print(f"type: {type(text)}")
def pipeline():
bq_task = bigquery_query_op(
query=QUERY,
project_id=PROJECT_ID,
output_filename=FILENAME)
#Since the output of the component is PipelineParam type and the argument is str, the following tasks will fail due to the argument type mismatch.
print_op(bq_task.outputs['table']) # task 1
As mentioned above, the output of the component (bq_task.outputs ['table']
) is a placeholder called PipelineParam
type, so it cannot be concatenated with a character string or operated.
Therefore, in the above program, the assignment was done with f-string.
def pipeline():
bq_task = bigquery_query_op(
query=QUERY,
project_id=PROJECT_ID,
output_filename=FILENAME)
#The following method fails because PipelineParam type cannot be cast to string
# print_op(bq_task.outputs['table'] + "/" + FILENAME) # task 1
#This passes
print_op(f"{bq_task.outputs['table']}/{FILENAME}") # task 1
Care should be taken when handling the output of the component, as the values are actually assigned at runtime in the pipeline.
Reference: Kubeflow --Pipeline Parameters
Declare the file name and component to save. As you can see from the output of the help
function, it requires different arguments than when saving a CSV file.
#Path to the file to save to GCS
BUCKET = 'Bucket name'
GCS_PATH = f'gs://{BUCKET}/query_from_kfp/query_result.csv'
bigquery_op_url = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.3.0/components/gcp/bigquery/query/to_gcs/component.yaml'
bigquery_query_op = comp.load_component_from_url(bigquery_op_url)
help(bigquery_query_op)
"""
Help on function Bigquery - Query:
Bigquery - Query(query: str, project_id: 'GCPProjectID', dataset_id: str = '', table_id: str = '', output_gcs_path: 'GCSPath' = '', dataset_location: str = 'US', job_config: dict = '', output_kfp_path: str = '')
Bigquery - Query
A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery
service and dump outputs to a Google Cloud Storage blob.
"""
Confirm that it was output to GCS by the following two steps as before. ** task 1. Check the output destination GCS path ** ** task 2. Read CSV from the output destination GCS path and output shape **
# task 1
@func_to_container_op
def print_op(text: InputPath('GCSPath')) -> None:
print(f"text: {text}")
print(f"type: {type(text)}")
# task 2
@func_to_container_op
def handle_csv_op(gcs_file_path: InputPath('GCSPath'), project:str) -> None:
print(f'path: {gcs_file_path}')
print(f'type: {type(gcs_file_path)}')
import subprocess
subprocess.run(['pip', 'install', 'google-cloud-storage', 'pandas'])
from google.cloud import storage
from io import BytesIO
import pandas as pd
client = storage.Client(project)
# point 1
with open(gcs_file_path, 'r') as f:
path = f.read()
# point 2
with BytesIO() as f:
client.download_blob_to_file(path, f)
content = f.getvalue()
df = pd.read_csv(BytesIO(content))
print(f'shape: {df.shape}')
# pipeline
@dsl.pipeline(
name='Bigquery query pipeline name',
description='Bigquery query pipeline'
)
def pipeline():
bq_task = bigquery_query_op(
query=QUERY,
project_id=PROJECT_ID,
output_gcs_path=GCS_PATH})
print_op(bq_task.outputs['output_gcs_path']) # task 1
handle_task = handle_csv_op(gcs=bq_task.outputs['output_gcs_path'],
project=PROJECT_ID) # task 2
# print_op log
text: /tmp/inputs/text/data
type: <class 'str'>
# handle_csv_op log
path: /tmp/inputs/gcs/data
type: <class 'str'>
shape: (10, 20)
I feel that the processing habit of the handle_csv_op
component is strong. In this case, the query results are stored in GCS, so the output from the bigquery_query_op
component is not a str type path, but a ** path to the file where the GCS path is written **.
Therefore, after reading the GCS path as shown below,
# point 1
with open(gcs_file_path, 'r') as f:
path = f.read() # gs://{BUCKET}/query_from_kfp/query_result.csv
Get the contents of the file from GCS as follows:
# point 2
with BytesIO() as f:
client.download_blob_to_file(path, f)
content = f.getvalue()
df = pd.read_csv(BytesIO(content))
This behavior is due to the output_gcs_path
being defined as the OutputPath
type in the Component Definition File (https://github.com/kubeflow/pipelines/blob/eeb7f8f04ac50351fd578a583a8ddc7df1e00bdd/components/gcp/bigquery/query/to_gcs/component.yaml#L79). I'd like you to make it a string obediently ... but the reason is a mystery.
If you specify InputPath
as the component argument type, ** the specific string is excluded from the argument name **.
For example, there is gcs_file_path
in the argument of the handle_csv_op
component, but when referring to it, it is referred to as gcs
.
# gcs_file_path=bq_task.outputs['output_gcs_path']is not
handle_task = handle_csv_op(gcs=bq_task.outputs['output_gcs_path'],
project=PROJECT_ID) # task 2
Document as below? There is, but it's hard to find, so I'm addicted to it. It's hard to find Kubeflow documentation and tutorials scattered all over the place.
Reference: Building Python function-based components --passing parameters by value
Submit a query to BigQuery and write the result to any BigQuery table.
Declare the table and components to save to.
#Where to save the query results
DATASET_ID = 'mail_retention_pipeline'
TABLE_ID = 'query_result'
FILENAME = 'query_result.csv'
#Query for checking query results
VERIFY_QUERY = f'''
SELECT
*
FROM
`{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
'''
#A component that saves query results in BigQuery
bigquery_table_op_url = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.3.0/components/gcp/bigquery/query/to_table/component.yaml'
bigquery_query_table_op = comp.load_component_from_url(bigquery_table_op_url)
#Component that outputs query results to CSV
bigquery_csv_op_url = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.3.0/components/gcp/bigquery/query/to_CSV/component.yaml'
bigquery_query_csv_op = comp.load_component_from_url(bigquery_csv_op_url)
help(bigquery_query_table_op)
"""
Help on function Bigquery - Query:
Bigquery - Query(query: str, project_id: 'GCPProjectID', table: str, dataset_id: str = '', table_id: str = '', dataset_location: str = 'US', job_config: dict = '')
Bigquery - Query
A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery
service and dump outputs to new table.
"""
Confirm that it is output to BigQuery by the following three steps. ** task 1. Submit a query to BigQuery and save the result in BigQuery ** ** task 2. Get query results from BigQuery and save as CSV ** ** task 3. Read the CSV file and check the shape **
# task 3
@func_to_container_op
def handle_csv_op(path: InputPath('CSV')) -> None:
import subprocess
subprocess.run(['pip', 'install', 'pandas'])
import pandas as pd
df = pd.read_csv(path)
print(f'shape: {df.shape}')
@dsl.pipeline(
name='Bigquery query pipeline name',
description='Bigquery query pipeline'
)
def pipeline():
# task 1:Save query results to BigQuery
bq_table_task = bigquery_query_table_op(
query=QUERY,
project_id=PROJECT_ID,
dataset_id=DATASET_ID,
table_id=TABLE_ID,
table='')
# task 2:Save query results as CSV
bq_csv_task = bigquery_query_csv_op(
query=VERIFY_QUERY,
project_id=PROJECT_ID,
output_filename=FILENAME).after(bq_table_task)
handle_task = handle_csv_op(f"{bq_csv_task.outputs['table']}/{FILENAME}") # task 3
# handle_csv_op log
path: /tmp/inputs/gcs/data
type: <class 'str'>
shape: (10, 20)
The DAG has a pipeline-like shape.
The bq_table_task
component has a mysterious argument called table
, which will not work unless you put some string in it. As far as the source code is seen, this parameter is not used, so it seems that the correction is omitted.
#A component that saves query results in BigQuery
bigquery_table_op_url = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.3.0/components/gcp/bigquery/query/to_table/component.yaml'
bigquery_query_table_op = comp.load_component_from_url(bigquery_table_op_url)
...
#There is a positional argument called table
help(bigquery_query_table_op)
"""
Help on function Bigquery - Query:
Bigquery - Query(query: str, project_id: 'GCPProjectID', table: str, dataset_id: str = '', table_id: str = '', dataset_location: str = 'US', job_config: dict = '')
Bigquery - Query
A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery
service and dump outputs to new table.
"""
When I checked the Kubeflow Pipelines repository, I found Modified PR, so if it is merged, this problem will be solved.
The process of saving query results in BigQuery can actually be achieved with a component that saves query results in GCS. As you can see from the output of help
, the component that stores the query results in GCS also has the arguments dataset_id
and table_id
.
#A component that saves query results in BigQuery
"""
Help on function Bigquery - Query:
Bigquery - Query(query: str, project_id: 'GCPProjectID', table: str, dataset_id: str = '', table_id: str = '', dataset_location: str = 'US', job_config: dict = '')
Bigquery - Query
A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery
service and dump outputs to new table.
"""
#A component that saves query results in GCS
"""
Help on function Bigquery - Query:
Bigquery - Query(query: str, project_id: 'GCPProjectID', dataset_id: str = '', table_id: str = '', output_gcs_path: 'GCSPath' = '', dataset_location: str = 'US', job_config: dict = '', output_kfp_path: str = '')
Bigquery - Query
A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery
service and dump outputs to a Google Cloud Storage blob.
"""
In other words, except in the situation where "query results are used only in the pipeline and do not need to be saved anywhere", it is OK to use a component that saves query results in GCS.
--Be careful of the version when using the official component that queries BigQuery --Japanese cannot be used for query comments if it is old --Note the input and output types of the component --There is a string to be omitted in the keyword argument of the component --Use components that store in GCS unless you want to complete the query results in the pipeline
that's all.
Recommended Posts